From c6d44858d198f6552f3f2197233d53b7de7957f9 Mon Sep 17 00:00:00 2001 From: Matthieu Dorier Date: Fri, 24 May 2024 21:10:00 -0500 Subject: [PATCH] Implemented margo_monitor_dump (#273) * implemented margo_monitor_dump * capped the precision at 9 --- include/margo-monitoring.h | 25 ++- src/margo-default-monitoring.c | 211 ++++++++++++++++++-------- src/margo-monitoring.c | 12 ++ tests/unit-tests/margo-monitoring.c | 227 +++++++++++++++++++++------- 4 files changed, 358 insertions(+), 117 deletions(-) diff --git a/include/margo-monitoring.h b/include/margo-monitoring.h index 36ea6e3b..6edc3f5f 100644 --- a/include/margo-monitoring.h +++ b/include/margo-monitoring.h @@ -197,13 +197,18 @@ typedef const char* margo_monitor_user_args_t; X(USER, user) /* clang-format on */ +typedef void (*margo_monitor_dump_fn)(void*, const char*, size_t); + struct margo_monitor { void* uargs; void* (*initialize)(margo_instance_id mid, void*, struct json_object*); void (*finalize)(void* uargs); + hg_return_t (*dump)(void* uargs, + margo_monitor_dump_fn dump_fn, + void* dump_args, + bool reset); const char* (*name)(); struct json_object* (*config)(void* uargs); - #define X(__x__, __y__) \ void (*on_##__y__)(void*, double, margo_monitor_event_t, \ margo_monitor_##__y__##_args_t); @@ -450,6 +455,24 @@ struct margo_monitor_cb_args { hg_return_t ret; }; +/** + * @brief Call the dump_fn function with a serialized version of + * the monitor's state. If reset is set to true, this function will + * also reset the monitor's internal state (e.g. all past RPC activities + * will be removed). + * + * @param mid Margo instance ID + * @param dump_fn Dump function pointer + * @param uargs User arguments for the dump function pointer + * @param reset Whether to reset to monitor's state + * + * @return HG_SUCCESS or other error code + */ +hg_return_t margo_monitor_dump(margo_instance_id mid, + margo_monitor_dump_fn dump_fn, + void* uargs, + bool reset); + /** * @brief Invokes the on_user callback of the monitor registered with the * margo instance. diff --git a/src/margo-default-monitoring.c b/src/margo-default-monitoring.c index 588984bd..d4ed9f7c 100644 --- a/src/margo-default-monitoring.c +++ b/src/margo-default-monitoring.c @@ -1,4 +1,5 @@ /** + * (C) 2022 The University of Chicago * * See COPYRIGHT in top-level directory. @@ -169,7 +170,8 @@ typedef struct bulk_key { ABT_mutex_unlock(ABT_MUTEX_MEMORY_GET_HANDLE(&((stats).mutex))); \ } while (0) -static struct json_object* statistics_to_json(const statistics_t* stats); +static struct json_object* statistics_to_json(const statistics_t* stats, + bool reset); /* Statistics related to the Mercury progress loop */ typedef struct hg_statistics { @@ -179,7 +181,8 @@ typedef struct hg_statistics { statistics_t trigger; } hg_statistics_t; -static struct json_object* hg_statistics_to_json(const hg_statistics_t* stats); +static struct json_object* hg_statistics_to_json(const hg_statistics_t* stats, + bool reset); /* Some statistics fields in the following structures will be * a pair of "duration" statistics (duration of the operation) @@ -203,7 +206,8 @@ typedef struct bulk_create_statistics { } bulk_create_statistics_t; static struct json_object* -bulk_create_statistics_to_json(const bulk_create_statistics_t* stats); +bulk_create_statistics_to_json(const bulk_create_statistics_t* stats, + bool reset); /* Statistics related to bulk transfers */ typedef struct bulk_transfer_statistics { @@ -216,7 +220,8 @@ typedef struct bulk_transfer_statistics { } bulk_transfer_statistics_t; static struct json_object* -bulk_transfer_statistics_to_json(const bulk_transfer_statistics_t* stats); +bulk_transfer_statistics_to_json(const bulk_transfer_statistics_t* stats, + bool reset); /* Statistics related to RPCs at their origin */ typedef struct origin_rpc_statistics { @@ -232,7 +237,7 @@ typedef struct origin_rpc_statistics { } origin_rpc_statistics_t; static struct json_object* -origin_rpc_statistics_to_json(const origin_rpc_statistics_t* stats); +origin_rpc_statistics_to_json(const origin_rpc_statistics_t* stats, bool reset); /* Statistics related to RPCs at their target */ typedef struct target_rpc_statistics { @@ -248,7 +253,7 @@ typedef struct target_rpc_statistics { } target_rpc_statistics_t; static struct json_object* -target_rpc_statistics_to_json(const target_rpc_statistics_t* stats); +target_rpc_statistics_to_json(const target_rpc_statistics_t* stats, bool reset); /* ======================================================================== * Time series structure definitions @@ -293,14 +298,16 @@ static rpc_time_series_t* find_or_add_time_series_for_rpc(struct default_monitor_state* monitor, hg_id_t rpc_id); static void free_all_time_series(struct default_monitor_state* monitor); -static struct json_object* rpc_time_series_to_json(rpc_time_series_t* rpc_ts); +static struct json_object* rpc_time_series_to_json(rpc_time_series_t* rpc_ts, + bool reset); static void update_rpc_time_series(struct default_monitor_state* monitor, double timestamp); static void update_pool_time_series(struct default_monitor_state* monitor, double timestamp); static struct json_object* -pool_time_series_to_json(const struct default_monitor_state* monitor); +pool_time_series_to_json(const struct default_monitor_state* monitor, + bool reset); /* ======================================================================== * RPC info @@ -385,12 +392,13 @@ typedef struct default_monitor_state { } default_monitor_state_t; static struct json_object* -monitor_statistics_to_json(const default_monitor_state_t* monitor); +monitor_statistics_to_json(const default_monitor_state_t* monitor, bool reset); static struct json_object* -monitor_time_series_to_json(const default_monitor_state_t* monitor); +monitor_time_series_to_json(const default_monitor_state_t* monitor, bool reset); static void -write_monitor_state_to_json_file(const default_monitor_state_t* monitor); +write_monitor_state_to_json_file(const default_monitor_state_t* monitor, + bool reset); /* A session is an object that will be associated with an hg_handle_t * when on_forward or on_rpc_handler is invoked, and will be destroyed @@ -551,6 +559,7 @@ static void* __margo_default_monitor_initialize(margo_instance_id mid, if (precision && json_object_is_type(precision, json_type_int)) { monitor->precision = json_object_get_int(precision); } + if (monitor->precision > 9) monitor->precision = 9; struct json_object* sampling = json_object_object_get(statistics, "sample_progress_every"); if (sampling && json_object_is_type(sampling, json_type_int)) { @@ -636,7 +645,7 @@ static void __margo_default_monitor_finalize(void* uargs) update_pool_time_series(monitor, ts); /* write JSON file */ - write_monitor_state_to_json_file(monitor); + write_monitor_state_to_json_file(monitor, false); /* free RPC info */ rpc_info_clear(monitor->rpc_info); @@ -1462,10 +1471,53 @@ __MONITOR_FN_EMPTY(prefinalize) __MONITOR_FN_EMPTY(finalize) __MONITOR_FN_EMPTY(user) +static hg_return_t __margo_default_monitor_dump(void* uargs, + margo_monitor_dump_fn dump_fn, + void* dump_args, + bool reset) +{ + const default_monitor_state_t* monitor + = (const default_monitor_state_t*)uargs; + + /* get printing precision */ + char double_format[] = "%.Xf"; + double_format[2] = (char)(48 + monitor->precision); + json_c_set_serialization_double_format(double_format, JSON_C_OPTION_GLOBAL); + + struct json_object* dump = json_object_new_object(); + + /* write statistics to json */ + if (monitor->enable_statistics) { + /* create JSON statistics */ + struct json_object* stats = monitor_statistics_to_json(monitor, reset); + json_object_object_add(dump, "stats", stats); + } + + /* write time series to json */ + if (monitor->enable_time_series) { + /* create JSON statistics */ + struct json_object* series + = monitor_time_series_to_json(monitor, reset); + json_object_object_add(dump, "series", series); + } + + if (dump_fn) { + /* convert to string and call the dump function */ + size_t json_len = 0; + const char* json_str = json_object_to_json_string_length( + dump, monitor->stats_pretty_json | JSON_C_TO_STRING_NOSLASHESCAPE, + &json_len); + dump_fn(dump_args, json_str, json_len); + } + json_object_put(dump); + return HG_SUCCESS; +} + struct margo_monitor __margo_default_monitor = {.uargs = NULL, .initialize = __margo_default_monitor_initialize, .finalize = __margo_default_monitor_finalize, + .dump = __margo_default_monitor_dump, .name = __margo_default_monitor_name, .config = __margo_default_monitor_config, #define X(__x__, __y__) .on_##__y__ = __margo_default_monitor_on_##__y__, @@ -1480,7 +1532,8 @@ struct margo_monitor* margo_default_monitor = &__margo_default_monitor; * ======================================================================== */ static void -write_monitor_state_to_json_file(const default_monitor_state_t* monitor) +write_monitor_state_to_json_file(const default_monitor_state_t* monitor, + bool reset) { if ((!monitor->filename_prefix) || (strlen(monitor->filename_prefix) == 0)) return; @@ -1515,7 +1568,7 @@ write_monitor_state_to_json_file(const default_monitor_state_t* monitor) goto finish_stats_file; } /* create JSON statistics */ - struct json_object* json = monitor_statistics_to_json(monitor); + struct json_object* json = monitor_statistics_to_json(monitor, reset); /* write statistics */ size_t json_len = 0; @@ -1550,7 +1603,7 @@ write_monitor_state_to_json_file(const default_monitor_state_t* monitor) goto finish_series_file; } /* create JSON statistics */ - struct json_object* json = monitor_time_series_to_json(monitor); + struct json_object* json = monitor_time_series_to_json(monitor, reset); /* write statistics */ size_t json_len = 0; @@ -1571,9 +1624,11 @@ write_monitor_state_to_json_file(const default_monitor_state_t* monitor) * Functions related to converting statistics into a json_object tree * ======================================================================== */ -static struct json_object* statistics_to_json(const statistics_t* stats) +static struct json_object* statistics_to_json(const statistics_t* stats, + bool reset) { struct json_object* json = json_object_new_object(); + ABT_mutex_spinlock(ABT_MUTEX_MEMORY_GET_HANDLE(&stats->mutex)); json_object_object_add_ex(json, "num", json_object_new_int(stats->num), JSON_C_OBJECT_ADD_KEY_IS_NEW); json_object_object_add_ex(json, "min", json_object_new_double(stats->min), @@ -1586,113 +1641,131 @@ static struct json_object* statistics_to_json(const statistics_t* stats) JSON_C_OBJECT_ADD_KEY_IS_NEW); json_object_object_add_ex(json, "sum", json_object_new_double(stats->sum), JSON_C_OBJECT_ADD_KEY_IS_NEW); + if (reset) { + ((statistics_t*)stats)->num = 0; + ((statistics_t*)stats)->min = 0; + ((statistics_t*)stats)->max = 0; + ((statistics_t*)stats)->avg = 0; + ((statistics_t*)stats)->var = 0; + ((statistics_t*)stats)->sum = 0; + } + ABT_mutex_unlock(ABT_MUTEX_MEMORY_GET_HANDLE(&stats->mutex)); return json; } static struct json_object* statistics_pair_to_json(const statistics_t* stats, const char* name1, - const char* name2) + const char* name2, + bool reset) { struct json_object* json = json_object_new_object(); - json_object_object_add_ex(json, name1, statistics_to_json(stats), + json_object_object_add_ex(json, name1, statistics_to_json(stats, reset), JSON_C_OBJECT_ADD_KEY_IS_NEW); - json_object_object_add_ex(json, name2, statistics_to_json(stats + 1), + json_object_object_add_ex(json, name2, statistics_to_json(stats + 1, reset), JSON_C_OBJECT_ADD_KEY_IS_NEW); return json; } -static struct json_object* hg_statistics_to_json(const hg_statistics_t* stats) +static struct json_object* hg_statistics_to_json(const hg_statistics_t* stats, + bool reset) { struct json_object* json = json_object_new_object(); - json_object_object_add_ex(json, "progress_with_timeout", - statistics_to_json(&stats->progress_with_timeout), - JSON_C_OBJECT_ADD_KEY_IS_NEW); + json_object_object_add_ex( + json, "progress_with_timeout", + statistics_to_json(&stats->progress_with_timeout, reset), + JSON_C_OBJECT_ADD_KEY_IS_NEW); json_object_object_add_ex( json, "progress_timeout_value_msec", - statistics_to_json(&stats->progress_timeout_value), + statistics_to_json(&stats->progress_timeout_value, reset), JSON_C_OBJECT_ADD_KEY_IS_NEW); json_object_object_add_ex( json, "progress_without_timeout", - statistics_to_json(&stats->progress_without_timeout), + statistics_to_json(&stats->progress_without_timeout, reset), JSON_C_OBJECT_ADD_KEY_IS_NEW); json_object_object_add_ex(json, "trigger", - statistics_to_json(&stats->trigger), + statistics_to_json(&stats->trigger, reset), JSON_C_OBJECT_ADD_KEY_IS_NEW); return json; } static struct json_object* -bulk_create_statistics_to_json(const bulk_create_statistics_t* stats) +bulk_create_statistics_to_json(const bulk_create_statistics_t* stats, + bool reset) { struct json_object* json = json_object_new_object(); json_object_object_add_ex(json, "duration", - statistics_to_json(&stats->duration), + statistics_to_json(&stats->duration, reset), JSON_C_OBJECT_ADD_KEY_IS_NEW); - json_object_object_add_ex(json, "size", statistics_to_json(&stats->size), + json_object_object_add_ex(json, "size", + statistics_to_json(&stats->size, reset), JSON_C_OBJECT_ADD_KEY_IS_NEW); return json; } static struct json_object* -bulk_transfer_statistics_to_json(const bulk_transfer_statistics_t* stats) +bulk_transfer_statistics_to_json(const bulk_transfer_statistics_t* stats, + bool reset) { struct json_object* json = json_object_new_object(); struct json_object* transfer = json_object_new_object(); json_object_object_add_ex(json, "itransfer", transfer, JSON_C_OBJECT_ADD_KEY_IS_NEW); json_object_object_add_ex(transfer, "duration", - statistics_to_json(&stats->transfer), + statistics_to_json(&stats->transfer, reset), JSON_C_OBJECT_ADD_KEY_IS_NEW); json_object_object_add_ex(transfer, "size", - statistics_to_json(&stats->transfer_size), + statistics_to_json(&stats->transfer_size, reset), JSON_C_OBJECT_ADD_KEY_IS_NEW); json_object_object_add_ex( json, "transfer_cb", statistics_pair_to_json(stats->transfer_cb, "duration", - "relative_timestamp_from_itransfer_start"), + "relative_timestamp_from_itransfer_start", + reset), JSON_C_OBJECT_ADD_KEY_IS_NEW); json_object_object_add_ex( json, "itransfer_wait", statistics_pair_to_json(stats->wait, "duration", - "relative_timestamp_from_itransfer_end"), + "relative_timestamp_from_itransfer_end", reset), JSON_C_OBJECT_ADD_KEY_IS_NEW); return json; } static struct json_object* -origin_rpc_statistics_to_json(const origin_rpc_statistics_t* stats) +origin_rpc_statistics_to_json(const origin_rpc_statistics_t* stats, bool reset) { struct json_object* json = json_object_new_object(); json_object_object_add_ex( json, "iforward", statistics_pair_to_json(stats->forward, "duration", - "relative_timestamp_from_create"), + "relative_timestamp_from_create", reset), JSON_C_OBJECT_ADD_KEY_IS_NEW); json_object_object_add_ex( json, "forward_cb", statistics_pair_to_json(stats->forward_cb, "duration", - "relative_timestamp_from_iforward_start"), + "relative_timestamp_from_iforward_start", + reset), JSON_C_OBJECT_ADD_KEY_IS_NEW); json_object_object_add_ex( json, "iforward_wait", statistics_pair_to_json(stats->wait, "duration", - "relative_timestamp_from_iforward_end"), + "relative_timestamp_from_iforward_end", reset), JSON_C_OBJECT_ADD_KEY_IS_NEW); json_object_object_add_ex( json, "set_input", statistics_pair_to_json(stats->set_input, "duration", - "relative_timestamp_from_iforward_start"), + "relative_timestamp_from_iforward_start", + reset), JSON_C_OBJECT_ADD_KEY_IS_NEW); json_object_object_add_ex( json, "get_output", statistics_pair_to_json(stats->get_output, "duration", - "relative_timestamp_from_wait_end"), + "relative_timestamp_from_wait_end", reset), JSON_C_OBJECT_ADD_KEY_IS_NEW); return json; } static struct json_object* -target_rpc_statistics_to_json(const target_rpc_statistics_t* stats) +target_rpc_statistics_to_json(const target_rpc_statistics_t* stats, bool reset) { struct json_object* json = json_object_new_object(); @@ -1700,38 +1773,40 @@ target_rpc_statistics_to_json(const target_rpc_statistics_t* stats) json_object_object_add_ex(json, "handler", handler, JSON_C_OBJECT_ADD_KEY_IS_NEW); json_object_object_add_ex(handler, "duration", - statistics_to_json(&stats->handler), + statistics_to_json(&stats->handler, reset), JSON_C_OBJECT_ADD_KEY_IS_NEW); json_object_object_add_ex( json, "ult", statistics_pair_to_json(stats->ult, "duration", - "relative_timestamp_from_handler_start"), + "relative_timestamp_from_handler_start", reset), JSON_C_OBJECT_ADD_KEY_IS_NEW); json_object_object_add_ex( json, "irespond", statistics_pair_to_json(stats->respond, "duration", - "relative_timestamp_from_ult_start"), + "relative_timestamp_from_ult_start", reset), JSON_C_OBJECT_ADD_KEY_IS_NEW); json_object_object_add_ex( json, "respond_cb", statistics_pair_to_json(stats->respond_cb, "duration", - "relative_timestamp_from_irespond_start"), + "relative_timestamp_from_irespond_start", + reset), JSON_C_OBJECT_ADD_KEY_IS_NEW); json_object_object_add_ex( json, "irespond_wait", statistics_pair_to_json(stats->wait, "duration", - "relative_timestamp_from_irespond_end"), + "relative_timestamp_from_irespond_end", reset), JSON_C_OBJECT_ADD_KEY_IS_NEW); json_object_object_add_ex( json, "set_output", statistics_pair_to_json(stats->set_output, "duration", - "relative_timestamp_from_irespond_start"), + "relative_timestamp_from_irespond_start", + reset), JSON_C_OBJECT_ADD_KEY_IS_NEW); json_object_object_add_ex( json, "get_input", statistics_pair_to_json(stats->get_input, "duration", - "relative_timestamp_from_ult_start"), + "relative_timestamp_from_ult_start", reset), JSON_C_OBJECT_ADD_KEY_IS_NEW); return json; } @@ -1799,7 +1874,7 @@ static void fill_json_with_rpc_info(struct json_object* rpc_json, } static struct json_object* -monitor_statistics_to_json(const default_monitor_state_t* state) +monitor_statistics_to_json(const default_monitor_state_t* state, bool reset) { struct json_object* json = json_object_new_object(); // add self address @@ -1814,7 +1889,7 @@ monitor_statistics_to_json(const default_monitor_state_t* state) JSON_C_OBJECT_ADD_KEY_IS_NEW); // mercury progress loop statistic json_object_object_add_ex(json, "progress_loop", - hg_statistics_to_json(&state->hg_stats), + hg_statistics_to_json(&state->hg_stats, reset), JSON_C_OBJECT_ADD_KEY_IS_NEW); // RPC statistics struct json_object* rpcs = json_object_new_object(); @@ -1847,7 +1922,7 @@ monitor_statistics_to_json(const default_monitor_state_t* state) sprintf(addr_key, "sent to %s", addr_info ? addr_info->name : ""); // convert origin_rpc_statistics to json - struct json_object* stats = origin_rpc_statistics_to_json(p); + struct json_object* stats = origin_rpc_statistics_to_json(p, reset); // add statistics to "origin" object with the address as key json_object_object_add_ex(origin, addr_key, stats, JSON_C_OBJECT_ADD_KEY_IS_NEW); @@ -1884,7 +1959,7 @@ monitor_statistics_to_json(const default_monitor_state_t* state) sprintf(addr_key, "received from %s", addr_info ? addr_info->name : ""); // convert target_rpc_statistics to json - struct json_object* stats = target_rpc_statistics_to_json(p); + struct json_object* stats = target_rpc_statistics_to_json(p, reset); // add statistics to "target" object with the address as key json_object_object_add_ex(target, addr_key, stats, JSON_C_OBJECT_ADD_KEY_IS_NEW); @@ -1927,7 +2002,8 @@ monitor_statistics_to_json(const default_monitor_state_t* state) struct json_object* bulk = json_object_object_get_or_create_object( received_from, "bulk"); // convert bulk_create_statistics to json - struct json_object* create = bulk_create_statistics_to_json(p); + struct json_object* create + = bulk_create_statistics_to_json(p, reset); // add statistics json_object_object_add_ex(bulk, "create", create, JSON_C_OBJECT_ADD_KEY_IS_NEW); @@ -1983,7 +2059,8 @@ monitor_statistics_to_json(const default_monitor_state_t* state) xfer_addr_info ? xfer_addr_info->name : ""); } // convert bulk_transfer_statistics to json - struct json_object* stats = bulk_transfer_statistics_to_json(p); + struct json_object* stats + = bulk_transfer_statistics_to_json(p, reset); // add statistics with the address as key json_object_object_add_ex(bulk, xfer_addr_key, stats, JSON_C_OBJECT_ADD_KEY_IS_NEW); @@ -1996,7 +2073,7 @@ monitor_statistics_to_json(const default_monitor_state_t* state) } static struct json_object* -monitor_time_series_to_json(const default_monitor_state_t* monitor) +monitor_time_series_to_json(const default_monitor_state_t* monitor, bool reset) { struct json_object* json = json_object_new_object(); // add self address @@ -2040,7 +2117,8 @@ monitor_time_series_to_json(const default_monitor_state_t* monitor) snprintf(key, 10, ""); } // create JSON object corresponding to this RPC's time series - struct json_object* json_ts = rpc_time_series_to_json(rpc_ts); + struct json_object* json_ts + = rpc_time_series_to_json(rpc_ts, reset); // add it to the RPC section json_object_object_add_ex(rpcs, key, json_ts, JSON_C_OBJECT_ADD_KEY_IS_NEW); @@ -2050,7 +2128,7 @@ monitor_time_series_to_json(const default_monitor_state_t* monitor) ABT_mutex_unlock( ABT_MUTEX_MEMORY_GET_HANDLE(&monitor->rpc_time_series_mtx)); } - struct json_object* pools = pool_time_series_to_json(monitor); + struct json_object* pools = pool_time_series_to_json(monitor, reset); json_object_object_add_ex(json, "pools", pools, JSON_C_OBJECT_ADD_KEY_IS_NEW); return json; @@ -2157,6 +2235,7 @@ static void time_series_clear(time_series_t* series) free(series->first_frame); series->first_frame = next; } + series->last_frame = NULL; } /* Free both RPC time series and Pool time series */ @@ -2211,7 +2290,8 @@ find_or_add_time_series_for_rpc(default_monitor_state_t* monitor, return ts; } -static struct json_object* rpc_time_series_to_json(rpc_time_series_t* rpc_ts) +static struct json_object* rpc_time_series_to_json(rpc_time_series_t* rpc_ts, + bool reset) { timedval_frame_t* frame = rpc_ts->rpc_count_series.first_frame; size_t array_size = 0; @@ -2255,6 +2335,11 @@ static struct json_object* rpc_time_series_to_json(rpc_time_series_t* rpc_ts) frame = frame->next; } + if (!reset) goto finish; + time_series_clear(&(rpc_ts->rpc_count_series)); + time_series_clear(&(rpc_ts->bulk_size_series)); + +finish: return json; } @@ -2285,7 +2370,7 @@ static void update_rpc_time_series(struct default_monitor_state* monitor, * ======================================================================== */ static struct json_object* -pool_time_series_to_json(const default_monitor_state_t* monitor) +pool_time_series_to_json(const default_monitor_state_t* monitor, bool reset) { ABT_mutex_spinlock( ABT_MUTEX_MEMORY_GET_HANDLE(&monitor->pool_time_series_mtx)); @@ -2343,6 +2428,14 @@ pool_time_series_to_json(const default_monitor_state_t* monitor) } } + if (!reset) goto finish; + + for (size_t i = 0; i < num_pools; i++) { + time_series_clear(&(monitor->pool_size_time_series[i])); + time_series_clear(&(monitor->pool_total_size_time_series[i])); + } + +finish: ABT_mutex_unlock( ABT_MUTEX_MEMORY_GET_HANDLE(&monitor->pool_time_series_mtx)); diff --git a/src/margo-monitoring.c b/src/margo-monitoring.c index 9aa3c022..33bc3df9 100644 --- a/src/margo-monitoring.c +++ b/src/margo-monitoring.c @@ -55,3 +55,15 @@ hg_return_t margo_request_get_monitoring_data(margo_request req, if (data) *data = req->monitor_data; return HG_SUCCESS; } + +hg_return_t margo_monitor_dump(margo_instance_id mid, + margo_monitor_dump_fn dump_fn, + void* uargs, + bool reset) +{ + if (!mid) return HG_INVALID_ARG; + if (!mid->monitor) return HG_SUCCESS; + if (!mid->monitor->dump) return HG_NOENTRY; + mid->monitor->dump(mid->monitor->uargs, dump_fn, uargs, reset); + return HG_SUCCESS; +} diff --git a/tests/unit-tests/margo-monitoring.c b/tests/unit-tests/margo-monitoring.c index 9bcff99a..2c990094 100644 --- a/tests/unit-tests/margo-monitoring.c +++ b/tests/unit-tests/margo-monitoring.c @@ -342,6 +342,39 @@ static MunitResult test_custom_monitoring(const MunitParameter params[], return MUNIT_OK; } +static void check_json_statistics_content( + struct json_object* content, + uint16_t provider_id_param, + const char* self_addr_str, + bool relay, + bool expected_zeros); + +struct dump_statistics_args { + uint16_t provider_id_param; + const char* self_addr_str; + bool relay; +}; + +static void dump_statistics(void* uargs, const char* content, size_t size) { + struct dump_statistics_args* args = (struct dump_statistics_args*)uargs; + + struct json_object* json_content = NULL; + struct json_tokener* tokener = json_tokener_new(); + json_content = json_tokener_parse_ex(tokener, content, size); + json_tokener_free(tokener); + + munit_assert_not_null(json_content); + munit_assert(json_object_is_type(json_content, json_type_object)); + struct json_object* json_stats = json_object_object_get(json_content, "stats"); + + check_json_statistics_content( + json_stats, args->provider_id_param, + args->self_addr_str, args->relay, false); + + json_object_put(json_content); +} + + static MunitResult test_default_monitoring_statistics(const MunitParameter params[], void* data) { @@ -350,6 +383,7 @@ static MunitResult test_default_monitoring_statistics(const MunitParameter param const char* protocol = munit_parameters_get(params, "protocol"); uint16_t provider_id_param = atoi(munit_parameters_get(params, "provider_id")); hg_bool_t relay = strcmp(munit_parameters_get(params, "relay"), "true") == 0 ? HG_TRUE : HG_FALSE; + bool reset = strcmp(munit_parameters_get(params, "reset"), "true") == 0 ? true : false; const char* json_config = "{\"monitoring\":{" "\"config\":{" @@ -433,6 +467,14 @@ static MunitResult test_default_monitoring_statistics(const MunitParameter param hret = margo_monitor_call_user(mid, MARGO_MONITOR_FN_START, NULL); munit_assert_int(hret, ==, HG_SUCCESS); + struct dump_statistics_args dump_args = { + .provider_id_param = provider_id_param, + .relay = relay, + .self_addr_str = self_addr_str + }; + hret = margo_monitor_dump(mid, dump_statistics, &dump_args, reset); + munit_assert_int(hret, ==, HG_SUCCESS); + margo_finalize(mid); char* filename = NULL; @@ -461,18 +503,40 @@ static MunitResult test_default_monitoring_statistics(const MunitParameter param struct json_tokener* tokener = json_tokener_new(); json_content = json_tokener_parse_ex(tokener, file_content, file_size); json_tokener_free(tokener); + + check_json_statistics_content( + json_content, provider_id_param, self_addr_str, relay, reset); + + json_object_put(json_content); + free(file_content); + free(filename); + + return MUNIT_OK; +} + +static void check_json_statistics_content( + struct json_object* json_content, + uint16_t provider_id_param, + const char* self_addr_str, + bool relay, + bool expected_zeros) +{ munit_assert_not_null(json_content); munit_assert(json_object_is_type(json_content, json_type_object)); #define ASSERT_JSON_HAS_KEY(parent, key_name, key_obj, type) \ - struct json_object* key_obj = json_object_object_get(parent, key_name); \ - munit_assert_not_null(key_obj); \ - munit_assert(json_object_is_type(key_obj, json_type_##type)) + struct json_object* key_obj = NULL; \ + do { \ + key_obj = json_object_object_get(parent, key_name); \ + munit_assert_not_null(key_obj); \ + munit_assert(json_object_is_type(key_obj, json_type_##type)); \ + } while(0) + #define ASSERT_JSON_HAS(parent, key, type) ASSERT_JSON_HAS_KEY(parent, #key, key, type) -#define ASSERT_JSON_HAS_STATS(parent, key) \ +#define ASSERT_JSON_HAS_STATS(parent, key, zero) \ do { \ ASSERT_JSON_HAS(parent, key, object); \ ASSERT_JSON_HAS(key, num, int); \ @@ -481,22 +545,30 @@ static MunitResult test_default_monitoring_statistics(const MunitParameter param ASSERT_JSON_HAS(key, avg, double); \ ASSERT_JSON_HAS(key, var, double); \ ASSERT_JSON_HAS(key, sum, double); \ + if(zero) { \ + munit_assert_int(0, ==, json_object_get_int64(json_object_object_get(key, "num"))); \ + munit_assert_double(0, ==, json_object_get_double(json_object_object_get(key, "min"))); \ + munit_assert_double(0, ==, json_object_get_double(json_object_object_get(key, "max"))); \ + munit_assert_double(0, ==, json_object_get_double(json_object_object_get(key, "avg"))); \ + munit_assert_double(0, ==, json_object_get_double(json_object_object_get(key, "var"))); \ + munit_assert_double(0, ==, json_object_get_double(json_object_object_get(key, "sum"))); \ + } \ } while(0) -#define ASSERT_JSON_HAS_DOUBLE_STATS(parent, key, secondary) \ +#define ASSERT_JSON_HAS_DOUBLE_STATS(parent, key, secondary, zero) \ do { \ ASSERT_JSON_HAS(parent, key, object); \ - ASSERT_JSON_HAS_STATS(key, duration); \ - ASSERT_JSON_HAS_STATS(key, secondary); \ + ASSERT_JSON_HAS_STATS(key, duration, zero); \ + ASSERT_JSON_HAS_STATS(key, secondary, zero); \ } while(0) { // check for the "progress_loop" section ASSERT_JSON_HAS(json_content, progress_loop, object); - ASSERT_JSON_HAS_STATS(progress_loop, progress_with_timeout); - ASSERT_JSON_HAS_STATS(progress_loop, progress_timeout_value_msec); - ASSERT_JSON_HAS_STATS(progress_loop, progress_without_timeout); - ASSERT_JSON_HAS_STATS(progress_loop, trigger); + ASSERT_JSON_HAS_STATS(progress_loop, progress_with_timeout, false); + ASSERT_JSON_HAS_STATS(progress_loop, progress_timeout_value_msec, false); + ASSERT_JSON_HAS_STATS(progress_loop, progress_without_timeout, false); + ASSERT_JSON_HAS_STATS(progress_loop, trigger, false); // check for the "rpcs" secions ASSERT_JSON_HAS(json_content, rpcs, object); @@ -534,11 +606,11 @@ static MunitResult test_default_monitoring_statistics(const MunitParameter param // "origin" section must have an object corresponding to the address sprintf(addr_key, "sent to %s", self_addr_str); ASSERT_JSON_HAS_KEY(origin, addr_key, sent_to, object); - ASSERT_JSON_HAS_DOUBLE_STATS(sent_to, iforward, relative_timestamp_from_create); - ASSERT_JSON_HAS_DOUBLE_STATS(sent_to, forward_cb, relative_timestamp_from_iforward_start); - ASSERT_JSON_HAS_DOUBLE_STATS(sent_to, iforward_wait, relative_timestamp_from_iforward_end); - ASSERT_JSON_HAS_DOUBLE_STATS(sent_to, set_input, relative_timestamp_from_iforward_start); - ASSERT_JSON_HAS_DOUBLE_STATS(sent_to, get_output, relative_timestamp_from_wait_end); + ASSERT_JSON_HAS_DOUBLE_STATS(sent_to, iforward, relative_timestamp_from_create, expected_zeros); + ASSERT_JSON_HAS_DOUBLE_STATS(sent_to, forward_cb, relative_timestamp_from_iforward_start, expected_zeros); + ASSERT_JSON_HAS_DOUBLE_STATS(sent_to, iforward_wait, relative_timestamp_from_iforward_end, expected_zeros); + ASSERT_JSON_HAS_DOUBLE_STATS(sent_to, set_input, relative_timestamp_from_iforward_start, expected_zeros); + ASSERT_JSON_HAS_DOUBLE_STATS(sent_to, get_output, relative_timestamp_from_wait_end, expected_zeros); } // RPC must have an "target" section ASSERT_JSON_HAS(echo, target, object); @@ -547,29 +619,29 @@ static MunitResult test_default_monitoring_statistics(const MunitParameter param sprintf(addr_key, "received from %s", self_addr_str); ASSERT_JSON_HAS_KEY(target, addr_key, received_from, object); ASSERT_JSON_HAS(received_from, handler, object); - ASSERT_JSON_HAS_STATS(handler, duration); - ASSERT_JSON_HAS_DOUBLE_STATS(received_from, ult, relative_timestamp_from_handler_start); - ASSERT_JSON_HAS_DOUBLE_STATS(received_from, irespond, relative_timestamp_from_ult_start); - ASSERT_JSON_HAS_DOUBLE_STATS(received_from, respond_cb, relative_timestamp_from_irespond_start); - ASSERT_JSON_HAS_DOUBLE_STATS(received_from, irespond_wait, relative_timestamp_from_irespond_end); - ASSERT_JSON_HAS_DOUBLE_STATS(received_from, set_output, relative_timestamp_from_irespond_start); - ASSERT_JSON_HAS_DOUBLE_STATS(received_from, get_input, relative_timestamp_from_ult_start); + ASSERT_JSON_HAS_STATS(handler, duration, expected_zeros); + ASSERT_JSON_HAS_DOUBLE_STATS(received_from, ult, relative_timestamp_from_handler_start, expected_zeros); + ASSERT_JSON_HAS_DOUBLE_STATS(received_from, irespond, relative_timestamp_from_ult_start, expected_zeros); + ASSERT_JSON_HAS_DOUBLE_STATS(received_from, respond_cb, relative_timestamp_from_irespond_start, expected_zeros); + ASSERT_JSON_HAS_DOUBLE_STATS(received_from, irespond_wait, relative_timestamp_from_irespond_end, expected_zeros); + ASSERT_JSON_HAS_DOUBLE_STATS(received_from, set_output, relative_timestamp_from_irespond_start, expected_zeros); + ASSERT_JSON_HAS_DOUBLE_STATS(received_from, get_input, relative_timestamp_from_ult_start, expected_zeros); // "received from ..." section must have a "bulk" section ASSERT_JSON_HAS(received_from, bulk, object); // "bulk" section must have a "create" section ASSERT_JSON_HAS(bulk, create, object); - ASSERT_JSON_HAS_STATS(create, duration); - ASSERT_JSON_HAS_STATS(create, size); + ASSERT_JSON_HAS_STATS(create, duration, expected_zeros); + ASSERT_JSON_HAS_STATS(create, size, expected_zeros); // "bulk" section must have a "pull from ..." section char pull_from_key[512]; sprintf(pull_from_key, "pull from %s", self_addr_str); ASSERT_JSON_HAS_KEY(bulk, pull_from_key, pull_from, object); // "pull from ..." secion must have a "itransfer" section ASSERT_JSON_HAS(pull_from, itransfer, object); - ASSERT_JSON_HAS_STATS(itransfer, duration); - ASSERT_JSON_HAS_STATS(itransfer, size); - ASSERT_JSON_HAS_DOUBLE_STATS(pull_from, transfer_cb, relative_timestamp_from_itransfer_start); - ASSERT_JSON_HAS_DOUBLE_STATS(pull_from, itransfer_wait, relative_timestamp_from_itransfer_end); + ASSERT_JSON_HAS_STATS(itransfer, duration, expected_zeros); + ASSERT_JSON_HAS_STATS(itransfer, size, expected_zeros); + ASSERT_JSON_HAS_DOUBLE_STATS(pull_from, transfer_cb, relative_timestamp_from_itransfer_start, expected_zeros); + ASSERT_JSON_HAS_DOUBLE_STATS(pull_from, itransfer_wait, relative_timestamp_from_itransfer_end, expected_zeros); } } // must have an "65535:65535:65535:65535" secion with a bulk create @@ -594,8 +666,8 @@ static MunitResult test_default_monitoring_statistics(const MunitParameter param ASSERT_JSON_HAS(received_from, bulk, object); // "bulk" secion must have a "create" section ASSERT_JSON_HAS(bulk, create, object); - ASSERT_JSON_HAS_STATS(create, duration); - ASSERT_JSON_HAS_STATS(create, size); + ASSERT_JSON_HAS_STATS(create, duration, expected_zeros); + ASSERT_JSON_HAS_STATS(create, size, expected_zeros); } if(relay == HG_TRUE) { #if (HG_VERSION_MAJOR > 2) || (HG_VERSION_MAJOR == 2 && HG_VERSION_MINOR >= 3) @@ -629,11 +701,11 @@ static MunitResult test_default_monitoring_statistics(const MunitParameter param // "origin" section must have a section index by destination address sprintf(addr_key, "sent to %s", self_addr_str); ASSERT_JSON_HAS_KEY(origin, addr_key, sent_to, object); - ASSERT_JSON_HAS_DOUBLE_STATS(sent_to, iforward, relative_timestamp_from_create); - ASSERT_JSON_HAS_DOUBLE_STATS(sent_to, forward_cb, relative_timestamp_from_iforward_start); - ASSERT_JSON_HAS_DOUBLE_STATS(sent_to, iforward_wait, relative_timestamp_from_iforward_end); - ASSERT_JSON_HAS_DOUBLE_STATS(sent_to, set_input, relative_timestamp_from_iforward_start); - ASSERT_JSON_HAS_DOUBLE_STATS(sent_to, get_output, relative_timestamp_from_wait_end); + ASSERT_JSON_HAS_DOUBLE_STATS(sent_to, iforward, relative_timestamp_from_create, expected_zeros); + ASSERT_JSON_HAS_DOUBLE_STATS(sent_to, forward_cb, relative_timestamp_from_iforward_start, expected_zeros); + ASSERT_JSON_HAS_DOUBLE_STATS(sent_to, iforward_wait, relative_timestamp_from_iforward_end, expected_zeros); + ASSERT_JSON_HAS_DOUBLE_STATS(sent_to, set_input, relative_timestamp_from_iforward_start, expected_zeros); + ASSERT_JSON_HAS_DOUBLE_STATS(sent_to, get_output, relative_timestamp_from_wait_end, expected_zeros); } // RPC must have an "target" section ASSERT_JSON_HAS(echo, target, object); @@ -642,39 +714,61 @@ static MunitResult test_default_monitoring_statistics(const MunitParameter param sprintf(addr_key, "received from %s", self_addr_str); ASSERT_JSON_HAS_KEY(target, addr_key, received_from, object); ASSERT_JSON_HAS(received_from, handler, object); - ASSERT_JSON_HAS_STATS(handler, duration); - ASSERT_JSON_HAS_DOUBLE_STATS(received_from, ult, relative_timestamp_from_handler_start); - ASSERT_JSON_HAS_DOUBLE_STATS(received_from, irespond, relative_timestamp_from_ult_start); - ASSERT_JSON_HAS_DOUBLE_STATS(received_from, respond_cb, relative_timestamp_from_irespond_start); - ASSERT_JSON_HAS_DOUBLE_STATS(received_from, irespond_wait, relative_timestamp_from_irespond_end); - ASSERT_JSON_HAS_DOUBLE_STATS(received_from, set_output, relative_timestamp_from_irespond_start); - ASSERT_JSON_HAS_DOUBLE_STATS(received_from, get_input, relative_timestamp_from_ult_start); + ASSERT_JSON_HAS_STATS(handler, duration, expected_zeros); + ASSERT_JSON_HAS_DOUBLE_STATS(received_from, ult, relative_timestamp_from_handler_start, expected_zeros); + ASSERT_JSON_HAS_DOUBLE_STATS(received_from, irespond, relative_timestamp_from_ult_start, expected_zeros); + ASSERT_JSON_HAS_DOUBLE_STATS(received_from, respond_cb, relative_timestamp_from_irespond_start, expected_zeros); + ASSERT_JSON_HAS_DOUBLE_STATS(received_from, irespond_wait, relative_timestamp_from_irespond_end, expected_zeros); + ASSERT_JSON_HAS_DOUBLE_STATS(received_from, set_output, relative_timestamp_from_irespond_start, expected_zeros); + ASSERT_JSON_HAS_DOUBLE_STATS(received_from, get_input, relative_timestamp_from_ult_start, expected_zeros); // "received from ..." must have a "bulk" section ASSERT_JSON_HAS(received_from, bulk, object); // "bulk" secion must have a "create" section ASSERT_JSON_HAS(bulk, create, object); // "bulk" section must have a section index by source address - ASSERT_JSON_HAS_STATS(create, duration); - ASSERT_JSON_HAS_STATS(create, size); + ASSERT_JSON_HAS_STATS(create, duration, expected_zeros); + ASSERT_JSON_HAS_STATS(create, size, expected_zeros); // "bulk" section must have a "pull from ..." section char pull_from_key[512]; sprintf(pull_from_key, "pull from %s", self_addr_str); ASSERT_JSON_HAS_KEY(bulk, pull_from_key, pull_from, object); // "pull from..." secion must have a "itransfer" secion ASSERT_JSON_HAS(pull_from, itransfer, object); - ASSERT_JSON_HAS_STATS(itransfer, duration); - ASSERT_JSON_HAS_STATS(itransfer, size); - ASSERT_JSON_HAS_DOUBLE_STATS(pull_from, transfer_cb, relative_timestamp_from_itransfer_start); - ASSERT_JSON_HAS_DOUBLE_STATS(pull_from, itransfer_wait, relative_timestamp_from_itransfer_end); + ASSERT_JSON_HAS_STATS(itransfer, duration, expected_zeros); + ASSERT_JSON_HAS_STATS(itransfer, size, expected_zeros); + ASSERT_JSON_HAS_DOUBLE_STATS(pull_from, transfer_cb, relative_timestamp_from_itransfer_start, expected_zeros); + ASSERT_JSON_HAS_DOUBLE_STATS(pull_from, itransfer_wait, relative_timestamp_from_itransfer_end, expected_zeros); } } } } +} + +static void check_json_time_series_content( + struct json_object* content, + uint16_t provider_id_param, + bool expected_zeros); + +struct dump_time_series_args { + uint16_t provider_id_param; +}; + +static void dump_time_series(void* uargs, const char* content, size_t size) { + struct dump_time_series_args* args = (struct dump_time_series_args*)uargs; + + struct json_object* json_content = NULL; + struct json_tokener* tokener = json_tokener_new(); + json_content = json_tokener_parse_ex(tokener, content, size); + json_tokener_free(tokener); + + munit_assert_not_null(json_content); + munit_assert(json_object_is_type(json_content, json_type_object)); + struct json_object* json_stats = json_object_object_get(json_content, "series"); + + check_json_time_series_content( + json_stats, args->provider_id_param, false); json_object_put(json_content); - free(file_content); - free(filename); - return MUNIT_OK; } static MunitResult test_default_monitoring_time_series(const MunitParameter params[], @@ -685,6 +779,8 @@ static MunitResult test_default_monitoring_time_series(const MunitParameter para const char* protocol = munit_parameters_get(params, "protocol"); uint16_t provider_id_param = atoi(munit_parameters_get(params, "provider_id")); hg_bool_t relay = strcmp(munit_parameters_get(params, "relay"), "true") == 0 ? HG_TRUE : HG_FALSE; + bool reset = strcmp(munit_parameters_get(params, "reset"), "true") == 0 ? HG_TRUE : HG_FALSE; + bool expected_zeros = reset; const char* json_config = "{\"monitoring\":{" "\"config\":{" @@ -768,6 +864,12 @@ static MunitResult test_default_monitoring_time_series(const MunitParameter para hret = margo_monitor_call_user(mid, MARGO_MONITOR_FN_START, NULL); munit_assert_int(hret, ==, HG_SUCCESS); + struct dump_time_series_args dump_args = { + .provider_id_param = provider_id_param + }; + hret = margo_monitor_dump(mid, dump_time_series, &dump_args, reset); + munit_assert_int(hret, ==, HG_SUCCESS); + margo_finalize(mid); char* filename = NULL; @@ -796,6 +898,20 @@ static MunitResult test_default_monitoring_time_series(const MunitParameter para struct json_tokener* tokener = json_tokener_new(); json_content = json_tokener_parse_ex(tokener, file_content, file_size); json_tokener_free(tokener); + + check_json_time_series_content(json_content, provider_id_param, expected_zeros); + + json_object_put(json_content); + free(file_content); + free(filename); + return MUNIT_OK; +} + +static void check_json_time_series_content( + struct json_object* json_content, + uint16_t provider_id_param, + bool expected_zeros) +{ munit_assert_not_null(json_content); munit_assert(json_object_is_type(json_content, json_type_object)); @@ -828,16 +944,12 @@ static MunitResult test_default_monitoring_time_series(const MunitParameter para ASSERT_JSON_HAS(__primary__, total_size, array); } } - - json_object_put(json_content); - free(file_content); - free(filename); - return MUNIT_OK; } static char* protocol_params[] = {"na+sm", NULL}; static char* provider_id_params[] = {"65535", "42", "0", NULL}; static char* relay_params[] = {"true", "false", NULL}; +static char* reset_params[] = {"true", "false", NULL}; static MunitParameterEnum test_params_custom[] = {{"protocol", protocol_params}, @@ -847,6 +959,7 @@ static MunitParameterEnum test_params[] = {{"protocol", protocol_params}, {"provider_id", provider_id_params}, {"relay", relay_params}, + {"reset", reset_params}, {NULL, NULL}}; static MunitTest test_suite_tests[] = {