Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for job arrays. #327

Draft
wants to merge 1 commit into
base: xdmod10.5
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
195 changes: 195 additions & 0 deletions background_scripts/supremm_arrayjobs.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
<?php
require_once __DIR__ . '/../configuration/linker.php';

use CCR\DB;
use CCR\Log;
use CCR\DB\MySQLHelper;


/**
* Get the configuration
*/
function get_config() {
$conf = array(
'emailSubject' => gethostname() . ': XDMOD: Data Warehouse: SUPReMM ETL Log',
);

$options = array(
'h' => 'help',
'q' => 'quiet',
'v' => 'verbose',
'r:' => 'resource:',
'a' => 'all',
's:' => 'start:',
'e:' => 'end:',
'd' => 'debug'
);

$conf['end'] = time();
$conf['start'] = $conf['end'] - (60*60*24 * 10);

$args = getopt(implode('', array_keys($options)), $options);

foreach ($args as $arg => $value)
{
switch ($arg)
{
case 'h':
case 'help':
usage_and_exit();
break;
case 'q':
case 'quiet':
$conf['consoleLogLevel'] = CCR\Log::ERR;
break;
case 'v':
case 'verbose':
$conf['consoleLogLevel'] = CCR\Log::INFO;
break;
case 'r':
case 'resource':
$conf['resource'] = $value;
break;
case 's':
case 'start':
$conf['start'] = strtotime($value);
break;
case 'e':
case 'end':
$conf['end'] = strtotime($value);
break;
case 'a':
case 'all':
$conf['start'] = 0;
$conf['end'] = time();
break;
case 'd':
case 'debug':
$conf['consoleLogLevel'] = CCR\Log::DEBUG;
break;
default:
break;
}
}
return $conf;
}

function usage_and_exit()
{
global $argv;

fwrite(
STDERR,
<<<"EOMSG"
Usage: {$argv[0]}
-h, --help Display this help

Controlling which jobs are processed:
-r, --resource=RESOURCE Only process array jobs on the given resource.
(default all resources in the SUPREMM realm are
processed). The resource code or the resource_id
may be specified.
-s, --start=START_TIME Specify the start of the time window. (default
10 days ago).
-e, --end=END_TIME Specify the end of the time window. (default
now).
-a, --all Do not restrict the jobs by time.

Controlling log output:
-q, --quiet Quiet mode. Only print error messages to the
console
-v, --verbose Enable informational messages to the console
-d, --debug Enable debug messages to the console
EOMSG
);
exit(1);
}


/**
* Get the list of resources to process
*/
function get_resource_list($conf)
{
$s = new \DataWarehouse\Query\SUPREMM\SupremmDbInterface();
$resources = $s->getResources();

$stmt = 'SELECT id FROM `modw`.`resourcefact` WHERE id IN (' . implode(',', $resources) . ')';
$args = array();

if (isset($conf['resource'])) {
$stmt .= ' AND (id = :resource OR code = :resource)';
$args['resource'] = $conf['resource'];
}

$db = DB::factory('datawarehouse');
$query = $db->handle()->prepare($stmt);
$query->execute($args);

return $query->fetchAll(PDO::FETCH_COLUMN, 0);
}

/**
* Build a table with mapping between the diffent jobs in a job array
*
* @param string $resource_id the resource id
* @param string $start the minimum data range to select
* @param string $end the maximum date range to select
*
* @return null
*/
function get_array_jobs($resource_id, $start, $end)
{
global $logger;
$db = DB::factory('datawarehouse');

$logger->debug('Checking for array jobs on resource_id=' . $resource_id . ' between ' . $start . ' and ' . $end);

$db->handle()->exec('DROP TABLE IF EXISTS `modw_supremm`.`job_array_tmp`');
$createtmp = $db->handle()->prepare('CREATE TABLE `modw_supremm`.`job_array_tmp` (KEY (resource_id, local_jobid, submit_time_ts, _id)) SELECT s._id, j.resource_id, j.local_jobid, j.submit_time_ts FROM modw_supremm.job s, modw.job_tasks j WHERE s.tg_job_id = j.job_id AND j.local_job_array_index != -1 and j.local_job_id_raw is not null and s.end_time_ts BETWEEN :start AND :end AND s.resource_id = :resource_id');
$createtmp->execute(array('start' => $start, 'end' => $end, 'resource_id' => $resource_id));

$logger->debug('Updating array job mapping table on resource_id=' . $resource_id . ' between ' . $start . ' and ' . $end);

$db->handle()->exec('INSERT IGNORE INTO `modw_supremm`.`job_array_peers` SELECT DISTINCT s1._id as job_id, s2._id as other_job_id FROM `modw_supremm`.`job_array_tmp` s1, `modw_supremm`.`job_array_tmp` s2 WHERE s1.resource_id = s2.resource_id and s1.submit_time_ts = s2.submit_time_ts and s1.local_jobid = s2.local_jobid and s1._id != s2._id');

$db->handle()->exec('DROP TABLE IF EXISTS `modw_supremm`.`job_array_tmp`');
}

$conf = get_config();

$logger = CCR\Log::factory('SUPREMM', $conf);

$cmd = implode(' ', array_map('escapeshellarg', $argv));
$logger->info("Command: $cmd");

$logger->notice(
array(
'message' => 'process array jobs start',
'process_start_time' => date('Y-m-d H:i:s'),
)
);

try
{
foreach (get_resource_list($conf) as $resource_id) {
get_array_jobs($resource_id, $conf['start'], $conf['end']);
}
}
catch (\Exception $e) {

$msg = 'Caught exception while executing: ' . $e->getMessage();
$logger->err(
array(
'message' => $msg,
'stacktrace' => $e->getTraceAsString()
)
);
}

$logger->notice(
array(
'message' => 'process array jobs end',
'process_end_time' => date('Y-m-d H:i:s'),
)
);
5 changes: 3 additions & 2 deletions background_scripts/xdmod-supremm-admin
Original file line number Diff line number Diff line change
Expand Up @@ -204,15 +204,16 @@ function truncateAction($config, $deleteSqlData)
}

$multiDel = <<<EOF
DELETE FROM `modw_supremm` . `job_name` , `modw_supremm` . `job_peers`
DELETE FROM `modw_supremm` . `job_name` , `modw_supremm` . `job_peers`, `modw_supremm` . `job_array_peers`
USING `modw_supremm`.`job`
LEFT JOIN `modw_supremm`.`job_name` ON `modw_supremm`.`job`.jobname_id = `modw_supremm`.`job_name`.id
LEFT JOIN `modw_supremm`.`job_peers` ON `modw_supremm`.`job`._id = `modw_supremm`.`job_peers`.job_id
LEFT JOIN `modw_supremm`.`job_array_peers` ON `modw_supremm`.`job`._id = `modw_supremm`.`job_array_peers`.job_id
WHERE
`modw_supremm`.`job`.resource_id = :resource_id
EOF;
$rows = $db->execute($multiDel, array('resource_id' => $resource_id));
$logger->notice('Deleted ' . $rows . ' rows from job_name and job_peers tables.');
$logger->notice('Deleted ' . $rows . ' rows from job_name, job array peers and job_peers tables.');
}
}

Expand Down
2 changes: 2 additions & 0 deletions bin/aggregate_supremm.sh
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ shift $((OPTIND-1))

php ${XDMOD_LIB_PATH}/supremm_sharedjobs.php $FLAGS

php ${XDMOD_LIB_PATH}/supremm_arrayjobs.php $FLAGS

php ${XDMOD_LIB_PATH}/aggregate_supremm.php $FLAGS $AGG_FLAGS

${XDMOD_BIN_PATH}/xdmod-build-filter-lists --realm SUPREMM $FLAGS
Expand Down
34 changes: 34 additions & 0 deletions classes/DataWarehouse/Query/SUPREMM/JobDataset.php
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,11 @@ public function __construct(
$this->addField(new TableField($rf, 'timezone'));
$this->addField(new TableField($rf, 'code', 'resource'));

$jt = new Table(new Schema('modw'), 'job_tasks', 'jt');
$this->addTable($jt);
$this->addWhereCondition(new WhereCondition(new TableField($dataTable, 'tg_job_id'), '=', new TableField($jt, 'job_id')));
$this->addField(new TableField($jt, 'local_job_array_index', 'local_job_array_index'));

} elseif ($stat == "peers") {
$jp = new Table(new Schema("modw_supremm"), "job_peers", "jp");
$this->joinTo($jp, "_id", "other_job_id", "jobid", "job_id");
Expand All @@ -145,6 +150,35 @@ public function __construct(
$this->addWhereCondition(new WhereCondition(new TableField($jf, "person_id"), '=', new TableField($pt, "id")));
$this->addField(new TableField($pt, "long_name", "name"));

$this->addOrder(
new \DataWarehouse\Query\Model\OrderBy(
new TableField($jf, 'start_time_ts'),
'asc',
'start_time_ts'
)
);
} elseif ($stat == "array") {

$jp = new Table(new Schema("modw_supremm"), "job_array_peers", "jp");
$this->joinTo($jp, "_id", "other_job_id", "jobid", "job_id");

$jf = new Table(new Schema("modw_supremm"), "job", "jf1");
$this->addTable($jf);
$this->addWhereCondition(new WhereCondition(new TableField($jp, "other_job_id"), '=', new TableField($jf, "_id")));
$this->addField(new TableField($jf, "local_job_id"));
$this->addField(new TableField($jf, "start_time_ts"));
$this->addField(new TableField($jf, "end_time_ts"));

$rt = new Table(new Schema("modw"), "resourcefact", "rf");
$this->addTable($rt);
$this->addWhereCondition(new WhereCondition(new TableField($jf, "resource_id"), '=', new TableField($rt, "id")));
$this->addField(new TableField($rt, "code", "resource"));

$pt = new Table(new Schema('modw'), 'person', 'p');
$this->addTable($pt);
$this->addWhereCondition(new WhereCondition(new TableField($jf, "person_id"), '=', new TableField($pt, "id")));
$this->addField(new TableField($pt, "long_name", "name"));

$this->addOrder(
new \DataWarehouse\Query\Model\OrderBy(
new TableField($jf, 'start_time_ts'),
Expand Down
4 changes: 4 additions & 0 deletions classes/DataWarehouse/Query/SUPREMM/JobMetadata.php
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ public function getJobMetadata(XDUser $user, $jobid)
$available_data[\DataWarehouse\Query\RawQueryTypes::PEERS] = true;
}

if ($job['local_job_array_index'] != -1) {
$available_data[\DataWarehouse\Query\RawQueryTypes::ARRAY_PEERS] = true;
}

// Always report that analytics are present; the data endpoint will
// report the error reason for any that are missing.
$available_data[\DataWarehouse\Query\RawQueryTypes::ANALYTICS] = true;
Expand Down