#
# Copyrights : CNRS
# Author : Oleg Lodygensky
# Acknowledgment : XtremWeb-HEP is based on XtremWeb 1.8.0 by inria : http://www.xtremweb.net/
# Web : http://www.xtremweb-hep.org
#
# This file is part of XtremWeb-HEP.
#
# XtremWeb-HEP is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# XtremWeb-HEP is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with XtremWeb-HEP. If not, see .
#
#
#
# File : xwhep.bridgesg2dg.pm
# Version : 0.1
# Created : Oct 30th, 2006
# Modified : Nov 16th, 2010
# Author : Oleg Lodygensky (lodygens _a_t_ lal dot in2p3 dot fr)
# CNRS - IN2P3 - LAL
# Purpose : this script binds XtremWeb-HEP as Globus scheduler
# this script is called by Globus GateKeeper
# so that EGEE can gain access to XtremWeb-HEP ressources
# License : GPL V2
# Web : http://www.xtremweb-hep.org
#
#
# Installation : this script must be installed in a glite UI
# in the directory
# /opt/globus/lib/perl/Globus/GRAM/JobManager
#
# Usage :
# $> voms-proxy-init --voms vo.lal.in2p3.fr
#
# $> glite-wms-job-submit -a -e https://grid25.lal.in2p3.fr:7443/glite_wms_wmproxy_server -r xw2.lal.in2p3.fr:2119/jobmanager-xw-lal WhoAmI.jdl
#
# Following is deprecated (kept for history, if someone someday writes a book on me :D )
#
# $> globus-job-run xw2.lal.in2p3.fr/jobmanager-fork /usr/bin/whoami
# $> globus-job-run xw2.lal.in2p3.fr/jobmanager-pbs /usr/bin/whoami
# $> globus-job-run xw2.lal.in2p3.fr/jobmanager-pbs -q lal /usr/bin/whoami
# $> cd ~/globus/
# $> edg-job-submit -r xw2.lal.in2p3.fr:2119/jobmanager-pbs-lal WhoAmI.jdl
# $> edg-job-status https://grid09.lal.in2p3.fr:9000/wG_tQ7qIbS8dl81jiqbIxQ
#
# $> mkdir ~/JobOutput
# $> cd ~/JobOutput/
# $> edg-job-get-output https://grid09.lal.in2p3.fr:9000/wG_tQ7qIbS8dl81jiqbIxQ
# $> cd lodygens_wG_tQ7qIbS8dl81jiqbIxQ/
# $> more std.out
#
#
use Globus::GRAM::Error;
use Globus::GRAM::JobState;
use Globus::GRAM::JobManager;
use Globus::Core::Paths;
use XML::Parser;
use XML::Dumper;
use Data::Dumper;
use Config;
use Cwd;
use File::Copy;
# NOTE: This package name must match the name of the .pm file!!
package Globus::GRAM::JobManager::xwhep;
@ISA = qw(Globus::GRAM::JobManager);
############################################################
#
# variable definitions
#
############################################################
my ($JAVA_HOME, $JAVA_BINDIR, $XWROOTDIR, $XWBINDIR, $XWCFGDIR, $xwsubmit, $xwstat, $xwresult, $xwdelete);
my %workAttributes;
############################################################
#
# This sets variable values
#
############################################################
BEGIN
{
$JAVA_HOME = "/usr/java/jdk1.6.0_07" || $ENV{JAVA_HOME} || $ENV{JAVA_INSTALL_PATH};
$JAVA_BINDIR = $JAVA_HOME."/bin";
$XWROOTDIR = "/opt/xwhep-bridge-7.0.0";
$XWBINDIR = "$XWROOTDIR/bin";
$XWCFGDIR = "$XWROOTDIR/conf";
$xwsubmit = "$XWBINDIR/xwsubmit";
$xwstat = "$XWBINDIR/xwstatus";
$xwresult = "$XWBINDIR/xwresult";
$xwdelete = "$XWBINDIR/xwrm";
}
############################################################
#
# This is the constructor
#
############################################################
sub new
{
my $proto = shift;
my $class = ref($proto) || $proto;
my $self = $class->SUPER::new(@_);
my $log_dir;
if(! exists($ENV{GLOBUS_SPOOL_DIR}))
{
$log_dir = $Globus::Core::Paths::tmpdir;
}
else
{
$log_dir = $ENV{GLOBUS_SPOOL_DIR};
}
# $self->{xtremweb_logfile} = "$log_dir/gram_xtremweb_log."
# . $self->{JobDescription}->uniq_id();
bless $self, $class;
return $self;
}
############################################################
#
# This submits new job to XtremWeb
# This retreives job definition from Globus::GRAM::JobDescription
#
############################################################
sub submit
{
my $self = shift;
my $description = $self->{JobDescription};
my $tag = $description->cache_tag() || $ENV{GLOBUS_GRAM_JOB_CONTACT};
my $status;
my $pbs_job_script;
my $pbs_job_script_name;
my $errfile = '';
my $job_id;
my $rsh_env;
my $script_url;
my @arguments;
my $email_when = '';
my $cache_pgm = "$Globus::Core::Paths::bindir/globus-gass-cache";
my %library_vars;
my $command = $xwsubmit;
my $result;
$self->append_path(\%ENV, 'JAVA_HOME', $JAVA_HOME);
$self->nfssync( $description->executable() )
unless $description->executable() eq '';
$self->nfssync( $description->stdin() )
unless $description->stdin() eq '';
open( DESC, '> /tmp/xwhep-submit.job-description.txt');
print DESC "#\n";
print DESC "# **********************#\n";
print DESC "# XtremWeb submit #\n";
print DESC "# **********************#\n";
print DESC "#\n";
print DESC "# Date : ", `date`,"\n";
print DESC "#\n";
print DESC "# log file = ", $self->{xtremweb_logfile}, "\n";
print DESC "# JAVA_BINDIR = ", $JAVA_BINDIR, "\n";
print DESC "# JAVA_HOME = ", $JAVA_HOME, "\n";
print DESC "# JAVA_INSTALL_PATH = ", $JAVA_INSTALL_PATH, "\n";
print DESC "# email = ", $description->email_address(), "\n";
print DESC "# queue = ", $description->queue(), "\n";
print DESC "# project = ", $description->project(), "\n";
print DESC "# host count = ", $description->host_count(), "\n";
print DESC "# directory = ", $description->directory(), "\n";
print DESC "# executable = ", $description->executable(), "\n";
my $zipErr = "/tmp/ziperr.txt";
`rm -f $zipErr`;
my $zipEnv = "";
if( -d $description->directory())
{
$zipEnv="/tmp/toto.zip";
`rm -f $zipEnv`;
my $workingDir=$description->directory();
my $currentDir;
chomp($currentDir = `pwd`);
print DESC "# current dir = $currentDir\n";
chdir($workingDir);
my $currentDir2;
chomp($currentDir2 = `pwd`);
print DESC "# current dir2 = $currentDir2\n";
my $CMD="/usr/bin/zip -r $zipEnv $workingDir/* >> $zipErr 2>&1";
print DESC "# zipping working dir command = $CMD\n";
chomp($result = `$CMD`);
my $RC = $?;
chdir($currentDir);
my $currentDir3;
chomp($currentDir3 = `pwd`);
print DESC "# current dir3 = $currentDir3\n";
print DESC "# zipping working dir return code = $RC\n";
print DESC "# zipping working dir result = $result\n";
}
if( -f $description->executable())
{
if($zipEnv eq "")
{
$zipEnv="/tmp/toto.zip";
`rm -f $zipEnv`;
}
my $executable=$description->executable();
chomp($result = `file $executable`);
my $RC0 = $?;
print DESC "# file return code = $RC0\n";
print DESC "# file result = $result\n";
my $executable=$description->executable();
my $executableBasename;
chomp($executableBasename = `basename $executable`);
my $CMD="/usr/bin/zip -j $zipEnv $executable >> $zipErr 2>&1";
print DESC "# zipping executable command = $CMD\n";
chomp($result = `$CMD`);
my $RC = $?;
chdir($currentDir);
print DESC "# zipping executable return code = $RC\n";
print DESC "# zipping executable result = $result\n";
#
# glite executable is always a bash script
#
$command = "$command bash $executableBasename";
}
# @arguments = $description->arguments();
# foreach(@arguments)
# {
# if(ref($_))
# {
# $self->log("# Xtremweb submit argument error");
# return Globus::GRAM::Error::RSL_ARGUMENTS;
# }
# print DESC "# __arguments__ = ", $_, "\n";
# $command = $command." ".$_;
# }
if(($description->stdin() ne "") && ($description->stdin() ne "/dev/null")) {
if($zipEnv eq "")
{
$zipEnv="/tmp/toto.zip";
`rm -f $zipEnv`;
}
print DESC "# stdin = \"", $description->stdin(), "\"\n";
my $stdin=$description->stdin();
my $CMD="/usr/bin/zip -j $zipEnv $stdin >> $zipErr 2>&1";
print DESC "# zipping executable command = $CMD\n";
chomp($result = `$CMD`);
my $RC = $?;
chdir($currentDir);
print DESC "# zipping stdin return code = $RC\n";
print DESC "# zipping stdin result = $result\n";
}
else {
print DESC "# stdin not defined (or set to /dev/null)\n";
}
print DESC "# stdout = ", $description->stdout(), "\n";
print DESC "# stderr = ", $description->stderr(), "\n";
if($zipEnv ne "")
{
$command = "$command --xwenv $zipEnv";
}
print DESC "\n\n";
print DESC "# command = $command\n";
print DESC "\n\n";
chomp($result = `$command`);
my $return_code = $?;
print DESC "# command return code = $return_code\n";
print DESC "# command result = $result\n";
# return Globus::GRAM::Error::JOBTYPE_NOT_SUPPORTED;
if($zipEnv ne "")
{
# `rm -f $zipEnv`;
}
if($return_code == 0)
{
print DESC "# successful submit, setting state to PENDING\n";
chomp($job_id = `echo $result | cut -d ' ' -f 1`);
print DESC "# job_id = $job_id\n";
close( DESC );
#system("$cache_pgm -cleanup-url $script_url");
$self->log("# Xtremweb submit successful, setting state to PENDING");
return {JOB_ID => $job_id,
JOB_STATE => Globus::GRAM::JobState::PENDING };
}
#system("$cache_pgm -cleanup-url $tag/pbs_job_script.$$");
$self->log("# Xtremweb submit unsuccessful");
close( DESC );
return Globus::GRAM::Error::INVALID_SCRIPT_REPLY;
}
############################################################
#
# This method retreives job status from XtremWeb
# This retreives job definition from Globus::GRAM::JobDescription
# This uses JobDescription->jobid() as the XtremWeb UID
#
############################################################
sub poll
{
my $self = shift;
my $description = $self->{JobDescription};
my $job_id = $description->jobid();
my $state;
my $status_line;
my $exit_code;
$self->log("polling job $job_id");
open( DESC, "> /tmp/xwhep-status.job-description-$job_id.txt");
print DESC "#\n";
print DESC "# **********************#\n";
print DESC "# XtremWeb status #\n";
print DESC "# **********************#\n";
print DESC "#\n";
print DESC "# Date : ", `date`,"\n";
print DESC "#\n";
print DESC "# JAVA_BINDIR = ", $JAVA_BINDIR, "\n";
print DESC "# JAVA_HOME = ", $JAVA_HOME, "\n";
print DESC "# JAVA_INSTALL_PATH = ", $JAVA_INSTALL_PATH, "\n";
print DESC "# email = ", $description->email_address(), "\n";
print DESC "# queue = ", $description->queue(), "\n";
print DESC "# project = ", $description->project(), "\n";
print DESC "# host count = ", $description->host_count(), "\n";
print DESC "# directory = ", $description->directory(), "\n";
print DESC "# job_id = ", $job_id, "\n";
print DESC "# stdin = ", $description->stdin(), "\n";
print DESC "# stdout = ", $description->stdout(), "\n";
print DESC "# stderr = ", $description->stderr(), "\n";
my $result;
$result = (grep($job_id, $self->pipe_out_cmd($xwstat, '--xwformat xml ', $job_id)))[1];
# get the exit code of the qstat command. for info search $CHILD_ERROR
# in perlvar documentation.
$exit_code = $? >> 8;
print DESC "# result = ", $result, "\n";
print DESC "# exit_code = ", $exit_code, "\n";
my $parser = new XML::Parser;
$parser->setHandlers( Start => \&xml_startElement,
End => \&xml_endElement,
Char => \&xml_characterData,
Default => \&xml_default);
if($exit_code == 5)
{
$self->log("xwstat unknown UID");
print DESC "# unknown UID \n";
close( DESC);
$state = Globus::GRAM::JobState::JOB_CANCEL_FAILED;
return {JOB_STATE => $state};
}
my $data = $parser->parse($result);
print DESC "# workAttributes[UID] = ".$workAttributes{'UID'}."\n";
print DESC "# workAttributes[STATUS] = ".$workAttributes{'STATUS'}."\n";
if($workAttributes{'STATUS'} eq "PENDING")
{
$state = Globus::GRAM::JobState::PENDING;
print DESC "# set state = PENDING \n";
}
elsif($workAttributes{'STATUS'} eq "WAITING")
{
$state = Globus::GRAM::JobState::PENDING;
print DESC "# set state = PENDING \n";
}
elsif($workAttributes{'STATUS'} eq "ERROR")
{
$state = Globus::GRAM::JobState::JOB_EXECUTION_FAILED;
print DESC "# set state = SUSPENDED \n";
}
elsif($workAttributes{'STATUS'} eq "RUNNING")
{
$state = Globus::GRAM::JobState::ACTIVE;
print DESC "# set state = ACTIVE \n";
}
elsif($workAttributes{'STATUS'} eq "COMPLETED")
{
# $self->log("qstat rc is 153 == Unknown Job ID == DONE");
# $state = Globus::GRAM::JobState::DONE;
# $self->nfssync( $description->stdout() )
# if $description->stdout() ne '';
# $self->nfssync( $description->stderr() )
# if $description->stderr() ne '';
if($description->stdout() ne '') {
my $currentDir;
chomp($currentDir = `pwd`);
print DESC "# current dir = $currentDir\n";
chdir("/tmp/");
my $currentDir2;
chomp($currentDir2 = `pwd`);
print DESC "# current dir2 = $currentDir2\n";
my $CMD="$xwresult $job_id";
print DESC "# xwresult command = $CMD\n";
chomp($result = `$CMD`);
my $RC = $?;
print DESC "# xwresult return code = $RC\n";
if($RC == 0) {
chdir($currentDir);
my $dirName=$job_id;
$dirName=~s/:/_/g;
$dirName="/tmp/bash/$dirName";
print DESC "# dirName = $dirName\n";
my $srcFile = "$dirName/stdout.out";
# print DESC "# copy(",$srcFile,",", $description->stdout(),")\n";
# copy($srcFile, $description->stdout())
# or print DESC "# $srcFile cannot be copied.\n";
open(OUPUT, '>> ',$description->stdout());
open (INPUT, $srcFile);
my $ligne;
while ($ligne = ) {
chop ($ligne);
print OUTPUT $ligne;
}
close INPUT;
close OUTPUT;
}
$self->nfssync( $description->stdout() );
}
$state = Globus::GRAM::JobState::DONE;
print DESC "# set state = DONE \n";
}
else
{
# This else is reached by an unknown response from xtremweb.
$self->log("xwstat returned an unknown response. Telling JM to ignore this poll");
print DESC "# set state UNKNOWN \n";
close( DESC);
return {};
}
print DESC "# poll done... \n";
close( DESC);
return {JOB_STATE => $state};
}
############################################################
#
# This method cancels job from XtremWeb
# This retreives job definition from Globus::GRAM::JobDescription
# This uses JobDescription->jobid() as the XtremWeb UID
#
############################################################
sub cancel
{
my $self = shift;
my $description = $self->{JobDescription};
my $job_id = $description->jobid();
$self->log("cancel job $job_id");
open( DESC, "> /tmp/xwhep-remove.job-description-$job_id.txt");
print DESC "#\n";
print DESC "# **********************#\n";
print DESC "# XtremWeb remove #\n";
print DESC "# **********************#\n";
print DESC "# Date : ", `date`,"\n";
print DESC "#\n";
print DESC "#\n";
print DESC "# JAVA_BINDIR = ", $JAVA_BINDIR, "\n";
print DESC "# JAVA_HOME = ", $JAVA_HOME, "\n";
print DESC "# JAVA_INSTALL_PATH = ", $JAVA_INSTALL_PATH, "\n";
print DESC "# email = ", $description->email_address(), "\n";
print DESC "# queue = ", $description->queue(), "\n";
print DESC "# project = ", $description->project(), "\n";
print DESC "# host count = ", $description->host_count(), "\n";
print DESC "# directory = ", $description->directory(), "\n";
print DESC "# job_id = ", $job_id, "\n";
close (DESC);
my $result;
$result = (grep($job_id, $self->pipe_out_cmd($xwdelete, $job_id)))[1];
if($? == 0)
{
return { JOB_STATE => Globus::GRAM::JobState::FAILED }
}
return Globus::GRAM::Error::JOB_CANCEL_FAILED();
}
############################################################
#
# This is the XML parser start element handler
#
############################################################
sub xml_startElement {
my( $parseinst, $element, %attrs ) = @_;
# print "* startElement element = ".$element."\n";
if ($element ne "WORK") {
# print "* startElement not work, exiting \n";
return;
}
%workAttributes = %attrs;
# print "* startElement attrs[UID] = ".$attrs{'UID'}."\n";
# print "* startElement attrs[ISSERVICE] = ".$attrs{'ISSERVICE'}."\n";
}
############################################################
#
# This is the XML parser end element handler
#
############################################################
sub xml_endElement {
my( $parseinst, $element ) = @_;
# print "* endElement element = ".$element."\n";
}
############################################################
#
# This is the XML parser character handler
#
############################################################
sub xml_characterData {
my( $parseinst, $data ) = @_;
if (($tag eq "title") || ($tag eq "summary")) {
$data =~ s/\n|\t//g;
# print "$data";
}
}
############################################################
#
# This is the XML parser default handler
#
############################################################
sub xml_default {
my( $parseinst, $data ) = @_;
# you could do something here
}
1;