Commit 9952b04d authored by Huff, Israel's avatar Huff, Israel
Browse files

- updated some SGE and PBS commands with input validity checks

 . in particular, this prevents "qsub" with no parameters which waits indefinitely on stdin
- added parsing for some SGE outputs (submitJob, checkJobs, deleteJobs)
parent 3122103c
Pipeline #97238 passed with stages
in 5 minutes and 22 seconds
......@@ -7,14 +7,27 @@ QString QueuePBS::listAllNodeInfo() { return "pbsnodes -a"; }
QString QueuePBS::listAllQueues() { return "qstat -q"; }
QString QueuePBS::checkJobs(QStringList jobIDs)
{
// TODO: optional -f job_id job_id ...
return "qstat ";
QString command = "qstat";
if (jobIDs.length() > 0) command += " -f " + jobIDs.join(" ");
return command;
}
QString QueuePBS::deleteJobs(QStringList jobIDs)
{ // TODO: qdel job_id job_id ...
return "qdel ";
{
if (jobIDs.length() == 0) return "";
int totalLen = 0;
for (auto id : jobIDs)
{
totalLen += id.trimmed().length();
}
if (totalLen == 0) return "";
return "qdel " + jobIDs.join(" ");
}
QString QueuePBS::submitJob(QString params)
{
// do not allow calling qsub using stdin since it will wait indefinitely
if (params.trimmed().length() == 0) return "";
return "qsub " + params;
}
QString QueuePBS::submitJob(QString params) { return "qsub " + params; }
QString QueuePBS::showQueueInfo(QString queueName)
{
return "showq " + queueName;
......
......@@ -16,9 +16,20 @@ QString QueueSGE::checkJobs(QStringList jobIDs)
QString QueueSGE::deleteJobs(QStringList jobIDs)
{
if (jobIDs.length() == 0) return "";
int totalLen = 0;
for (auto id : jobIDs)
{
totalLen += id.trimmed().length();
}
if (totalLen == 0) return "";
return "qdel " + jobIDs.join(" ");
}
QString QueueSGE::submitJob(QString params) { return "qsub " + params; }
QString QueueSGE::submitJob(QString params)
{
// do not allow calling qsub using stdin since it will wait indefinitely
if (params.trimmed().length() == 0) return "";
return "qsub " + params;
}
QString QueueSGE::showQueueInfo(QString queueName)
{
return "qconf -sq " + queueName;
......@@ -28,4 +39,79 @@ QString QueueSGE::showFreeProcs(QString params)
(void)sizeof(params);
return "qstat -g c";
}
// returns job ID on success or -1 on failure
int QueueSGE::parseSubmitJobReturn(QString str)
{
// examples:
// Your job 99 ("spin_test") has been submitted
// Your job - array 90.1 - 16 : 1(" test ")has been submitted;
int jobID = -1;
QRegularExpression re = QRegularExpression("job[^0-9]*([0-9]+)");
QRegularExpressionMatch rem = re.match(str);
if (rem.hasMatch())
{
jobID = rem.captured(1).toInt();
}
return jobID;
}
// returns QSet of job IDs on success or emtpy QSet on failure or if no jobs
// running
QSet<int> QueueSGE::parseCheckJobsReturn(QString str)
{
// examples:
// job-ID prior name user state submit/start at queue
// slots ja-task-ID
// -----------------------------------------------------------------------------------------------------------------
// 105 0.50000 spintest mpi_user r 04/14/2020 13:48:03
// all.q@rpi3-compute-node-1.clus 1 1 105 0.50000 spintest mpi_user
// r 04/14/2020 13:48:03 all.q@rpi3-compute-node-2.clus 1 2
QSet<int> jobIDs;
QRegularExpression re = QRegularExpression("[^0-9]*([0-9]+)");
QStringList subStrList =
str.split(QRegExp("[\r\n]"), QString::SkipEmptyParts);
for (auto subStr : subStrList)
{
QRegularExpressionMatch rem = re.match(subStr);
if (rem.hasMatch())
{
int jobID = rem.captured(1).toInt();
jobIDs.insert(jobID);
}
}
return jobIDs;
}
// returns QSet of job IDs on success or emtpy QSet on failure or if no jobs
// running
QSet<int> QueueSGE::parseDeleteJobsReturn(QString str)
{
// examples:
// mpi_user has registered the job-array task 112.1 for deletion
// mpi_user has registered the job-array task 112.2 for deletion
// ...
// mpi_user has deleted job 113
//
// denied: job "111" does not exist
// denied: job "112" does not exist
//
//
QSet<int> jobIDs;
QRegularExpression re = QRegularExpression("[^0-9]*([0-9]+)");
QStringList subStrList =
str.split(QRegExp("[\r\n]"), QString::SkipEmptyParts);
for (auto subStr : subStrList)
{
QRegularExpressionMatch rem = re.match(subStr);
if (rem.hasMatch())
{
int jobID = rem.captured(1).toInt();
jobIDs.insert(jobID);
}
}
return jobIDs;
}
} // namespace rsm
#ifndef RSMSGE_QUEUESGE_HH_
#define RSMSGE_QUEUESGE_HH_
#include <QRegularExpression>
#include <QRegularExpressionMatch>
#include <QSet>
#include "rsmcore/queuebase.hh"
namespace rsm
......@@ -16,6 +20,9 @@ class QueueSGE : public QueueBase
QString submitJob(QString params) override;
QString showQueueInfo(QString queueName) override;
QString showFreeProcs(QString params) override;
int parseSubmitJobReturn(QString str);
QSet<int> parseCheckJobsReturn(QString str);
QSet<int> parseDeleteJobsReturn(QString str);
}; // class QueueSGE
} // namespace rsm
......
......@@ -394,6 +394,7 @@ void SessionWorker::requestExec(QString command)
ssh_channel_free(channel);
return;
}
// TODO: should we use timeout or non-blocking version of this call?
int nbytes = ssh_channel_read(channel, buffer, sizeof(buffer), 0);
while (nbytes > 0)
{
......@@ -419,4 +420,4 @@ void SessionWorker::requestExec(QString command)
emit execFinished();
}
} // namespace rsm
\ No newline at end of file
} // namespace rsm
......@@ -34,7 +34,7 @@ ExamplePortalWidget::ExamplePortalWidget(QWidget *parent)
mUserNameEdit = new QLineEdit("", this);
mConnectButton = new QPushButton("Connect", this);
mCommandEdit = new QLineEdit(this);
mCommandSubmitButton = new QPushButton("Submit", this);
mCommandSubmitButton = new QPushButton("Submit Command", this);
mSchedulerBox = new QGroupBox(this);
mPBSRadioButton = new QRadioButton("PBS", this);
mSGERadioButton = new QRadioButton("SGE", this);
......@@ -43,8 +43,10 @@ ExamplePortalWidget::ExamplePortalWidget(QWidget *parent)
mGetQueueInfoTextEdit = new QLineEdit(this);
mGetAllNodeInfoButton = new QPushButton("Node Info", this);
mGetAllJobInfoButton = new QPushButton("Job Info", this);
mDeleteJobButton = new QPushButton("Delete Job", this);
mSubmitJobButton = new QPushButton("Submit Job", this);
mDeleteJobButton = new QPushButton("Delete Job(s)", this);
mDeleteJobTextEdit = new QLineEdit(this);
mSubmitJobTextEdit = new QLineEdit("", this);
mTextEdit = new QTextEdit(this);
......@@ -75,6 +77,8 @@ ExamplePortalWidget::ExamplePortalWidget(QWidget *parent)
layout->addWidget(mGetQueueInfoButton, row, 1);
layout->addWidget(mGetQueueInfoTextEdit, row, 2);
layout->addWidget(mGetAllNodeInfoButton, ++row, 1);
layout->addWidget(mSubmitJobButton, ++row, 1);
layout->addWidget(mSubmitJobTextEdit, row, 2);
layout->addWidget(mGetAllJobInfoButton, ++row, 1);
layout->addWidget(mDeleteJobButton, ++row, 1);
layout->addWidget(mDeleteJobTextEdit, row, 2);
......@@ -86,6 +90,7 @@ ExamplePortalWidget::ExamplePortalWidget(QWidget *parent)
connect(mSchedulerButtonGroup,
QOverload<int>::of(&QButtonGroup::buttonClicked), [=](int id) {
(void)sizeof(id);
QString name = mSchedulerButtonGroup->checkedButton()->text();
if ("PBS" == name)
queue = &queuePBS;
......@@ -104,6 +109,8 @@ ExamplePortalWidget::ExamplePortalWidget(QWidget *parent)
&ExamplePortalWidget::submitGetAllNodeInfoToHost);
connect(mGetAllJobInfoButton, &QPushButton::pressed, this,
&ExamplePortalWidget::submitGetAllJobInfoToHost);
connect(mSubmitJobButton, &QPushButton::pressed, this,
&ExamplePortalWidget::submitSubmitJobToHost);
connect(mDeleteJobButton, &QPushButton::pressed, this,
&ExamplePortalWidget::submitDeleteJobToHost);
......@@ -166,37 +173,43 @@ void ExamplePortalWidget::submitCommandToHost()
{
radix_tagged_line("submitCommandToHost()");
QString command = mCommandEdit->text();
mSession->requestExec(command);
submitCommand("custom", command);
}
void ExamplePortalWidget::submitListAllQueuesToHost()
{
radix_tagged_line("submitListAllQueuesToHost()");
QString command = queue->listAllQueues();
mSession->requestExec(command);
submitCommand("listAllQueues", command);
}
void ExamplePortalWidget::submitGetQueueInfoToHost()
{
radix_tagged_line("submitGetQueueInfoToHost()");
QString command = queue->showQueueInfo(mGetQueueInfoTextEdit->text());
fprintf(stderr, "%s\n", command.toStdString().c_str());
mSession->requestExec(command);
submitCommand("showQueueInfo", command);
}
void ExamplePortalWidget::submitGetAllNodeInfoToHost()
{
radix_tagged_line("submitGetAllNodeInfoToHost()");
QString command = queue->listAllNodeInfo();
mSession->requestExec(command);
submitCommand("listAllNodeInfo", command);
}
void ExamplePortalWidget::submitGetAllJobInfoToHost()
{
radix_tagged_line("submitGetAllJobInfoToHost()");
QString command = queue->checkJobs(QStringList());
fprintf(stderr, "%s\n", command.toStdString().c_str());
mSession->requestExec(command);
submitCommand("checkJobs", command);
}
void ExamplePortalWidget::submitSubmitJobToHost()
{
radix_tagged_line("submitSubmitJobToHost()");
QStringList jobIDs = QStringList();
QString command = queue->submitJob(mSubmitJobTextEdit->text());
submitCommand("submitJob", command);
}
void ExamplePortalWidget::submitDeleteJobToHost()
......@@ -206,15 +219,14 @@ void ExamplePortalWidget::submitDeleteJobToHost()
if (mDeleteJobTextEdit->text().length() > 0)
jobIDs = mDeleteJobTextEdit->text().split(QRegExp("[ ,]"));
QString command = queue->deleteJobs(jobIDs);
fprintf(stderr, "%d\n", jobIDs.size());
fprintf(stderr, "%s\n", command.toStdString().c_str());
mSession->requestExec(command);
submitCommand("deleteJobs", command);
}
void ExamplePortalWidget::slotExecOutputReady()
void ExamplePortalWidget::submitCommand(QString commandType, QString command)
{
QByteArray output = mSession->readExecOutput();
mTextEdit->append(output);
if ("" != execingCommandType) return;
execingCommandType = commandType;
mSession->requestExec(command);
}
void ExamplePortalWidget::connectionFailed(QString message)
......@@ -377,12 +389,36 @@ void ExamplePortalWidget::execFailed(QString message)
radix_tagged_line(message.toStdString());
mTextEdit->append(message);
mTextEdit->append("\n");
execingCommandType = "";
}
void ExamplePortalWidget::execFinished()
{
mTextEdit->append("Execution finished.\n");
if ("submitJob" == execingCommandType)
{
int jobID = queueSGE.parseSubmitJobReturn(mSession->readExecOutput());
fprintf(stderr, "%d\n", jobID);
}
if ("checkJobs" == execingCommandType)
{
QSet<int> jobIDs =
queueSGE.parseCheckJobsReturn(mSession->readExecOutput());
fprintf(stderr, "--\n");
for (auto jobID : jobIDs) fprintf(stderr, "%d\n", jobID);
fprintf(stderr, "--\n");
}
if ("deleteJobs" == execingCommandType)
{
QSet<int> jobIDs =
queueSGE.parseDeleteJobsReturn(mSession->readExecOutput());
fprintf(stderr, "--\n");
for (auto jobID : jobIDs) fprintf(stderr, "%d\n", jobID);
fprintf(stderr, "--\n");
}
execingCommandType = "";
}
MainWindow::MainWindow(QWidget *parent)
: QMainWindow(parent)
{
......
......@@ -44,8 +44,10 @@ class ExamplePortalWidget : public QWidget
QLineEdit *mGetQueueInfoTextEdit;
QPushButton *mGetAllNodeInfoButton;
QPushButton *mGetAllJobInfoButton;
QPushButton *mSubmitJobButton;
QPushButton *mDeleteJobButton;
QLineEdit *mDeleteJobTextEdit;
QLineEdit *mSubmitJobTextEdit;
QTextEdit *mTextEdit;
rsm::SessionController::SP mSession;
......@@ -53,6 +55,9 @@ class ExamplePortalWidget : public QWidget
rsm::QueueSGE queueSGE;
rsm::QueueBase *queue;
void submitCommand(QString commandType, QString command);
QString execingCommandType;
public:
ExamplePortalWidget(QWidget *parent = nullptr);
......@@ -63,8 +68,8 @@ class ExamplePortalWidget : public QWidget
void submitGetQueueInfoToHost();
void submitGetAllNodeInfoToHost();
void submitGetAllJobInfoToHost();
void submitSubmitJobToHost();
void submitDeleteJobToHost();
void slotExecOutputReady();
void connectionFailed(QString message);
void connectionSuccessful();
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment