#!/usr/bin/python """ GIP plugin updating CE and cluster attributes based on MAUI information. This plugin is an alternative to the old PBS-based GIP plugin. It takes into accounts specific MAUI features like standing reservations. """ __version__ = "2.0.3-2" __author__ = "Michel Jouvin , Cedric Duprilot " import sys import logging import syslog import getopt import string import os import re from optparse import OptionParser try: from TorqueMauiConfParser import * except: sys.stderr.write("Failed to load TorqueMauiConfParser module. Check PYTHONPATH is appropriate.") sys.exit(1) # Initializations verbosity = 0 # convert triplet hours:minutes:seconds into seconds def convertHhMmSs(timeStr): diagPattern = re.compile(r'(?P\d+):(?P\d+):(?P\d+)') matcher = diagPattern.search(timeStr) if matcher: hours = int(matcher.group('hour')) minutes = int(matcher.group('minute')) secondes = int(matcher.group('second')) time = hours * 3600 + minutes * 60 + secondes elif timeStr != "-": time = 0 return int(time) # note: this class works around the bug in the python logging # package fixed in patch #642974. If the version of python-logging # is upgraded, this class can be replaced by logging.SysLogHandler class SLHandler(logging.Handler): def __init__(self, ident, logopt=0, facility=syslog.LOG_USER): logging.Handler.__init__(self) self.ident = ident self.logopt = logopt self.facility = facility self.mappings = { logging.DEBUG: syslog.LOG_DEBUG, logging.INFO: syslog.LOG_INFO, logging.WARN: syslog.LOG_WARNING, logging.ERROR: syslog.LOG_ERR, logging.CRITICAL: syslog.LOG_CRIT, } def encodeLevel(self, level): return self.mappings.get(level, syslog.LOG_INFO) def emit(self, record): syslog.openlog(self.ident, self.logopt, self.facility) msg = self.format(record) prio = self.encodeLevel(record.levelno) syslog.syslog(prio, msg) syslog.closelog() def abort_without_output(msg): logging.error(msg) logging.error("Exiting without output, GIP will use static values") sys.exit(2) def debug(level,msg): if level <= verbosity: sys.stderr.write("%s\n" % msg) # Initialize logger logging.getLogger("").setLevel(logging.INFO) # syslog handler shdlr = SLHandler("lcg-info-dynamic-maui") logging.getLogger("").addHandler(shdlr) # stderr handler stdrhdlr = logging.StreamHandler() fmt=logging.Formatter("%(asctime)s lcg-info-dynamic-maui:" + " %(message)s","%F %T") logging.getLogger("").addHandler(stdrhdlr) stdrhdlr.setFormatter(fmt) # Initialize parser parser = OptionParser() parser.add_option('-l','--ce-ldif', dest='ce_ldif_file', action='store', default=None, help="LDIF file describing CE resources to update") parser.add_option('-s','--cluster-ldif', dest='cluster_ldif_file', default=None, action='store', help="LDIF file describing cluster resources to update") parser.add_option('-H','--host','--server', dest='hostname', default=None, action='store', help="Host name of Torque/MAUI server") parser.add_option('-v', '--debug', dest='verbosity', action='count', default=0, help='Increase verbosity level for debugging (on stderr)') parser.add_option('--diagnose-output', dest='diag_output', action='store', default=None, help='File containing a diagnose -r output. Be sure it is up-to-date.') parser.add_option('--flavor', dest='ce_flavor', default='all', action='store', help="CE flavor (lcg or cream or all)") parser.add_option('--max-normal-slots', dest='max_normal_slots', action='store', type="int", default=None, help="Maximum number of normal slots") parser.add_option('--version', dest='version', action='store_true', default=False, help='Display various information about this script') options, args = parser.parse_args() if options.verbosity: verbosity = options.verbosity if options.version: debug (0,"Version %s written by %s" % (__version__,__author__)) debug (0,__doc__) sys.exit(0) if not options.ce_ldif_file and not options.cluster_ldif_file: parser.print_usage() abort_without_output("Either option --ce-ldif or --cluster-ldif must be specified.") if options.hostname: schedhost = options.hostname else: schedhost = os.popen('hostname -f').readline().strip('\n') debug (1,"No scheduler host specified. Using default (%s)" % (schedhost)) if options.ce_flavor and not re.match('all|cream|lcg',options.ce_flavor): abort_without_output("Invalid value for --flavor option (%s): must be 'all', 'cream' or 'lcg'." % (options.ce_flavor)) # Initializations ce_dn_to_queue = {} subcluster_dn_list = [] output = [] # Retrieve CE DNs from static LDIF file if options.ce_ldif_file: try: ce_ldif = open(options.ce_ldif_file,'r') except IOError: abort_without_output("Error opening config file " + options.ce_ldif_file) # gets the dns containing the keyword GlueVOViewLocalID or GlueCEUniqueID # and put this dn in ce_dn_to_queue list. # Regexp is not exactly the same for LCG and CREAM CE. if options.ce_flavor == 'cream': jobmanager_prefix = 'cream' jobmanager_name = 'pbs' elif options.ce_flavor == 'lcg': jobmanager_prefix = 'jobmanager' jobmanager_name = 'pbs|lcgpbs' else: jobmanager_prefix = 'jobmanager|cream' jobmanager_name = 'pbs|lcgpbs' ceIDPattern = re.compile(r'^dn:\s+GlueCEUniqueID=[\w\-\.:]+/+(?:'+jobmanager_prefix+')-(?:'+jobmanager_name+')-(?P[\w\-\.]+),') for line in ce_ldif: matcher = ceIDPattern.match(line) if matcher: ce_dn_to_queue[line.strip('\n')] = matcher.group('queue') ce_ldif.close() debug (1,'Number of CE DN found in %s: %d' %(options.ce_ldif_file,len(ce_dn_to_queue))) else: debug (1,'No CE static LDIF file specified. Ignoring update of CE parameters') # Retrieving subcluster DNs from static LDIF file if options.cluster_ldif_file: try: subcluster_ldif = open(options.cluster_ldif_file,'r') except IOError: abort_without_output("Error opening config file " + options.cluster_ldif_file) # gets the DNs containing the keyword GlueSubClusterUniqueID # and put this DN in subcluster_dn_list list subclusterIDPattern = re.compile(r'dn:\s+GlueSubClusterUniqueID=') for line in subcluster_ldif: if subclusterIDPattern.search(line): subcluster_dn_list.append(line.strip('\n')) subcluster_ldif.close() else: debug (1,'No cluster static LDIF file specified. Ignoring update of subcluster parameters') # Initialize MAUI and PBS configuration parsers torqueMauiConf = TorqueMauiConfParser(schedhost,verbosity=verbosity,diagOutputFile=options.diag_output) # From Torque, get Torque version, Total and Free CPUs torqueVersion = torqueMauiConf.getTorqueVersion() totalJobSlots = torqueMauiConf.getProcNum(activeOnly=True) #Get number of SDJ job slots configured, used (globally) SDJSlotsStats = torqueMauiConf.getQueueSDJSlots() totalSDJSlots = SDJSlotsStats['total'] usedSDJSlots = SDJSlotsStats['used'] #Get total number of used job slots (properly counting MPI jobs) totalUsedSlots = torqueMauiConf.getTotalUsedSlots() # Compute the total number of slots for non-SDJ jobs and the amount used normalJobSlots = totalJobSlots - totalSDJSlots; if normalJobSlots < 0: debug(1,'Number of slots for normal jobs (%d) negative. Reset to 0.' % normalJobSlots) normalJobSlots = 0 elif options.max_normal_slots and normalJobSlots > options.max_normal_slots: debug(1,'Number of slots for normal jobs (%d) greater than the maximum number defined. Resetting to %d.' % (normalJobSlots,options.max_normal_slots)) normalJobSlots = options.max_normal_slots freeNormalSlots = normalJobSlots - (totalUsedSlots - usedSDJSlots) if freeNormalSlots < 0: debug(1,'Number of free slots for normal jobs negative (%d). Reset to 0.' % freeNormalSlots) freeNormalSlots = 0 elif freeNormalSlots > normalJobSlots: debug(1,'Free slots for normal jobs (%d) > total slot number. Reset to %d.' % (freeNormalSlots,normalJobSlots)) freeNormalSlots = normalJobSlots usedNormalSlots = normalJobSlots - freeNormalSlots # Compute the total number of actually available job slots. # Will be the maximum number of available slots for any queue. totalFreeSlots = totalJobSlots - totalUsedSlots if totalFreeSlots < 0: debug(1,'Total number of available slots negative (%d). Reset to 0.' % totalFreeSlots) totalFreeSlots = 0 debug(1,'Number of job slots on active nodes: normal=%d, SDJ=%d' %(normalJobSlots,totalSDJSlots)) debug(1,'Number of used job slots on active nodes: %d (normal:%d, SDJ:%d)' %(totalUsedSlots,usedNormalSlots,usedSDJSlots)) debug(1,'Number of available job slots (max for any queue with SDJ enabled): %d' %(totalFreeSlots)) # Build Glue information for each queue having an entry in the CE static LDIF file. # DN associated with a queue is in ce_dn_to_queue, retrieved for CE static LDIF file. if options.ce_ldif_file: for dn in ce_dn_to_queue: queue = ce_dn_to_queue[dn] output.append(dn) # Retrieve queue params. If the returned value is None, it means # the queue doesn't exist. In this case stop processing here for this queue. queueParams = torqueMauiConf.getQueueParams(queue) # if not queueParams: # logging.error("Queue '%s' doesn't exist" % (queue)) # debug(1,"Queue '%s' doesn't exist" % (queue)) # continue # Compute total/free CPUs (slots) freeJobSlots = 0 queueSDJSlots = torqueMauiConf.getQueueSDJSlots(queue) queueTotalSlots = normalJobSlots + queueSDJSlots['total'] freeJobSlots = freeNormalSlots + queueSDJSlots['free'] if freeJobSlots < 0: debug(1,'Number of free job slots negative (%d). Reset to 0.' % (freeJobSlots)) freeJobSlots = 0 elif freeJobSlots > totalFreeSlots: debug(1,'Number of free slots in queue (%d) > total number of free slots. Reset to %d.' % (freeJobSlots,totalFreeSlots)) freeJobSlots = totalFreeSlots output.append('GlueCEInfoLRMSVersion: '+torqueVersion) # For historical reasons, there are 2 attributes representing total number of CPUs output.append('GlueCEInfoTotalCPUs: '+str(queueTotalSlots)) output.append('GlueCEPolicyAssignedJobSlots: '+str(queueTotalSlots)) # For historical reasons, there are 2 attributes representing number of free slots output.append('GlueCEStateFreeCPUs: '+str(freeJobSlots)) output.append('GlueCEStateFreeJobSlots: '+str(freeJobSlots)) # Get the value of the following attributes: # GlueCEPolicyMaxCPUTime, GlueCEPolicyMaxTotalJobs, GlueCEPolicyPriority, # GlueCEPolicyMaxRunningJobs, GlueCEPolicyMaxWallClockTime, GlueCEStateStatus queueEnabled = 0 queueStarted = 0 MaxCPUTime = 0 MaxPCPUTime = 0 #queueParams = torqueMauiConf.getQueueParams(queue) # Simple resources if 'max_queuable' in queueParams: output.append('GlueCEPolicyMaxTotalJobs: %s' % (queueParams['max_queuable'][0])) if 'Priority' in queueParams: output.append('GlueCEPolicyPriority: %s' % (queueParams['Priority'][0])) if 'max_running' in queueParams: output.append('GlueCEPolicyMaxRunningJobs: %s' % (queueParams['max_running'][0])) else: output.append('GlueCEPolicyMaxRunningJobs: %s' % normalJobSlots) if 'walltime' in queueParams['resources_max']: output.append('GlueCEPolicyMaxWallClockTime: %d' % (convertHhMmSs(queueParams['resources_max']['walltime'][0])/60)) # For GlueCEPolicyMaxCPUTime, use Torque cput if pcput (per process limit) is not set or > cput. # Else use pcput (real limit for single process jobs, not very meaningful anyway for MPI jobs). # If none are defined, keep the default value (don't redefine here). if 'cput' in queueParams['resources_max']: MaxCPUTime = convertHhMmSs(queueParams['resources_max']['cput'][0]) if 'pcput' in queueParams['resources_max']: MaxPCPUTime = convertHhMmSs(queueParams['resources_max']['pcput'][0]) if (MaxCPUTime > 0) and ((MaxPCPUTime == 0) or (MaxCPUTime < MaxPCPUTime)): output.append('GlueCEPolicyMaxCPUTime: %d' % (MaxCPUTime/60)) elif (MaxPCPUTime > 0): output.append('GlueCEPolicyMaxCPUTime: %d' % (MaxPCPUTime/60)) # Set GlueCEStateStatus according to 'enabled' and 'started' if 'enabled' in queueParams and (queueParams['enabled'][0].lower() == 'true'): queueEnabled = True else: queueEnabled = False if 'started' in queueParams and (queueParams['started'][0].lower() == 'true'): queueStarted = True else: queueStarted = False if queueEnabled == 1 and queueStarted == 1: queueStatus = 'Production' elif queueEnabled == 1: queueStatus = 'Queueing' elif queueStarted == 1: queueStatus = 'Draining' else: queueStatus = 'Closed' output.append('GlueCEStateStatus: %s' % (queueStatus)) # End DN attributes by an empty line output.append('\n') # Update SubCluster logical CPUs # This part is maintained for backward compatibility but is not compliant # with the agreement that GlueSubClusterLogicalCPUs is a static value representing # the number of cores configured (and not active) in the cluster. # It is also not able to cope with multiple homogeneous subclusters in a cluster. if options.cluster_ldif_file: for dn in subcluster_dn_list: output.append(dn) output.append('GlueSubClusterLogicalCPUs: '+str(normalJobSlots)) output.append('\n') # Print results for line in output: print line