#! /usr/bin/python import sys import os import signal import time import tempfile import skyd_func from tiny import hide_env, start, say # 'boss' is the machine controlling skyd_wait # grunts are machines controlling skyd_orbit boss = os.environ['HOSTNAME'].split('.')[0] stime = time.time() long_pause = 86400/2 report_delay = 10 report_prefix = 'skyd_' def skyd_load(claim): """ #+ # NAME: # skyd_load # PURPOSE: # Read conf file and set up the dictionary needed for # tracking the skyd_orbit processes launched on all grunts. # SIDE EFFECTS: # Several entries in the conf file are updated/added. # 'pid' is set with pid of skyd_wait # 'boss' is set to hostname of machine that controls skyd_wait # 'time' is set to time at which skyd_wait was started # # If 'max_load' (# processes per grunt) is not set then # 'max_load' is set to 2. # IF 'grunts' (list of machines that run skyd_orbit) is not # set then 'grunts' is set to 'boss' (i.e. the daemon will # run htm_orbit locally only). # If 'reportdir' (dir for report files) is not set then it # is set to $TUB. # PROCEDURE: # The global variable new_runs is set up as a dictionary # with one entry for each grunt listed in the conf file. # The entry for each grunt is a list of max_proc elements. # Each element is a dictionary with entries describing # the process. Each process is characterized by: # 'status': 'dead','start' or 'runs' # 'wmark' : '' or 'watermark' # 'result': '', 'runs','done','kill' # Processes are initialized here as 'dead' with a # blank watermark. The watermark is a filename of type # /skyd_ with a unique set # of characters (created by tempfile.mkstemp). # MODIFICATION HISTORY: # DEC-2005, Paul Hick (UCSD/CASS; pphick@ucsd.edu) #- """ global new_runs # Counts processes global reportdir csay = 'skyd_load' status = dict ([ ('number', 0), ('message', '') ]) status = skyd_func.skyd_read_conf(cffile,claim,status) if status['number'] != 0: say(csay,'E','cf',status['message']+'\nterminated') conf = status.pop('conf') if claim: # Only for the initial load conf['pid' ] = '%d'%pid # Daemon process id conf['boss'] = boss # Machine controlling deamon conf['time'] = time.strftime('%Y_%j_%H%M%S') # Time of (re)load # Count number of groups. Set cur_group to the last one # so that the next orbit processed will be from group 0. cur_group = 0 for key in conf.keys(): if key.find('group_') == 0: cur_group += 1 conf['cur_group'] = '%d'%(cur_group-1) if conf.has_key('max_load'): max_load = int(conf['max_load']) else: max_load = 2 conf['max_load'] = '%d'%max_load if conf.has_key('grunts'): grunts = conf['grunts'].split(',') else: grunts = [boss] conf['grunts'] = grunts if conf.has_key('reportdir'): reportdir = os.path.expandvars(conf['reportdir']) if not os.path.isdir(reportdir): say(csay,'I','#'+reportdir,'does not exist') reportdir = '' else: reportdir = '' if reportdir == '': reportdir = os.environ['TUB'] conf['reportdir'] = hide_env(reportdir) say(csay,'I','#'+conf['reportdir'],'is report directory') status['conf'] = conf status = skyd_func.skyd_write_conf(cffile,status) if status['number'] != 0: say(csay,'E','cf',status['message']+'\nterminated') else: max_load = int(conf['max_load']) grunts = conf['grunts'].split(',') grunts_load = dict() for i in range(len(grunts)): if ':' not in grunts[i]: grunts_load[grunts[i]] = max_load else: run = grunts[i].split(':') grunts[i] = run[0] grunts_load[grunts[i]] = min(int(run[1]),max_load) # Initalize the list of runs # with status='dead' and no watermark. new_runs = dict() for grunt in grunts: new_runs[grunt] = [] for run in range(grunts_load[grunt]): new_runs[grunt].append(skyd_func.skyd_empty_run()) return def skyd_reload(): """ #+ # NAME: # skyd_reload # PURPOSE: # Rereads the conf file and resets the dictionary needed for # tracking the skyd_orbit processes. # CALLING SEQUENCE: # skyd_reload # PROCEDURE: # skyd_reload is called when a SIGHUP signal is received. # # skyd_reload merges the current list of processes in new_runs # with the list of unfinished processes in old_runs. # Then htm_load is called to set up a fresh new_runs list. # # The old_runs array will only contain processes that are # marked as 'start' or 'runs'. # MODIFICATION HISTORY: # DEC-2005, Paul Hick (UCSD/CASS; pphick@ucsd.edu) #- """ global new_runs global old_runs global terminator csay = 'skyd_reload' say(csay,'I','reload','\nconfiguration file\n') # Save the current old_runs list. Note that on the first # reload old_runs is an empty directory. # old_runs needs to be integrated later on with the new # old_runs list. The new_runs list becomes the new old_runs list saved_runs = old_runs #old_runs = new_runs # NOT RIGHT old_runs = dict() # Instead do this for grunt in new_runs.keys(): old_runs[grunt] = [] old_runs[grunt].extend(new_runs[grunt]) # Remove all 'dead' processes from the old_runs list grunts = old_runs.keys() for grunt in grunts: run = skyd_func.skyd_not_running(old_runs[grunt]) while run != -1: old_runs[grunt][run:run+1] = [] run = skyd_func.skyd_not_running(old_runs[grunt]) # Remove all grunts that have no processes left for grunt in grunts: if len(old_runs[grunt]) == 0: old_runs.pop(grunt) # old_runs now only contains grunts that have at least # on process left, and all processes are not marked as # 'dead', i.e. they will be 'start' or 'runs'. # The same is true for the saved_runs array, since it # was constructed by the above section of code in an # earlier call to this routine. # Add the saved_runs list (the old old_runs list) to the # new old_runs list. grunts = old_runs.keys() while len(saved_runs) > 0: grunt = (saved_runs.keys())[0] if grunts.count(grunt) == 0: old_runs[grunt] = saved_runs[grunt] else: old_runs[grunt].extend(saved_runs[grunt]) saved_runs.pop(grunt) # At this point saved_runs should be an empty dictionary # and old_runs contains all unfinished processes. # old_runs might also be empty; if not then none of the # processes in old_runs are marked 'dead', i.e. these # are the leftover skyd_orbit runs that haven't reported back # that they have finished. skyd_load(False) # skyd_load sets up a new_runs dictionary with all processes # marked as 'dead'. If old_runs has grunts in common with # new_runs then transfer the processes in old_runs to new_runs. grunts = new_runs.keys() for grunt in old_runs.keys(): if grunts.count(grunt) != 0: # Grunt on new and old list n = min([len(new_runs[grunt]),len(old_runs[grunt])]) new_runs[grunt][0:n] = old_runs[grunt][0:n] old_runs[grunt][0:n] = [] # Remove transferred runs from old list # Remove all grunts in old_runs that have no processes left # Note that there still may be processes left in old_runs; either # because the grunt is not anymore in new_list, or because there are # more than max_load processes running (I don't think this can happen) grunts = old_runs.keys() for grunt in grunts: if len(old_runs[grunt]) == 0: old_runs.pop(grunt) if terminator: say(csay,'I','terminator','cleared\n') terminator = False # Fire up the skyd_orbit processes grunts = new_runs.keys() count = 0 for grunt in grunts: count = skyd_start(grunt, count) return def skyd_find_run( reports, lst_runs ): """ #+ # NAME: # skyd_find_run # PURPOSE: # Match a file on a list of files to a process on a specfied list # CALLING SEQUENCE: # rtn = skyd_find_run( reports, lst_runs ) # INPUTS: # reports string array # list of files in reportdir # skyd_orbit runs will send SIGUSR1 signals back # to skyd_wait signalling that a 'report' file was # put in reportdir reporting about their status. # These report files should be in the 'reports' array. # lst_runs dictionary # old_runs or new_runs list of skyd_orbit processes # OUTPUTS: # rtn dictionary # if a matching report is found # then the entries are # 'report' name of report from 'reports' # 'grunt' name of grunt running process # 'run' number of run on grunt # 'status' either 'start' or 'runs' # 'result' 'runs' if 'status'='start' # either 'done' or 'kill' if # 'status'='runs' # if no matching report is found # then only one entry is present: # 'report' set to null string # PROCEDURE: # An attemps is made to match one of the processes in # lst_runs to one of the reports. # If a process is marked 'start' then the matching # report file has name _runs or _kill. # If a process is marked 'runs' then the matching # report file has name _done or _kill # If a process is marked 'dead' then no report # is expected for that process. # MODIFICATION HISTORY: # DEC-2005, Paul Hick (UCSD/CASS; pphick@ucsd.edu) #- """ csay = 'skyd_find_run' run_found = False all_grunts = lst_runs.keys() for grunt in all_grunts: # Loop over all grunts for run in range(len(lst_runs[grunt])): # Loop over all runs report = lst_runs[grunt][run]['report'] # /skyd_ status = lst_runs[grunt][run]['status'] if status == 'dead': if report != '': say(csay,'E',grunt,'run %d is %s, but has a watermark\nterminated'%(run,status)) elif status == 'start': if report == '': say(csay,'E',grunt,'run %d is %s, but has no watermark\nterminated'%(run,status)) name = os.path.split(report)[1] # report_htm_ result = 'runs' run_found = reports.count(name+'_'+result) != 0 if run_found: break result = 'kill' run_found = reports.count(name+'_'+result) != 0 if run_found: break elif status == 'runs': if report == '': say(csay,'E',grunt,'run %d is %s, but has no watermark\nterminated'%(run,status)) name = os.path.split(report)[1] result = 'done' run_found = reports.count(name+'_'+result) != 0 if run_found: break result = 'kill' run_found = reports.count(name+'_'+result) != 0 if run_found: break if run_found: rtn = dict({'report' : report+'_'+result, 'wmark' : lst_runs[grunt][run]['wmark'], 'grunt' : grunt , 'run' : run , 'status' : status , 'result' : result } ) break else: rtn = dict( {'report':''} ) return rtn def skyd_find(): """ #+ # NAME: # skyd_find # PURPOSE: # Match a file in reportdir against a process in the old_runs # or new_runs dictionary. # CALLING SEQUENCE: # rtn = skyd_find() # OUTPUTS: # rtn dictionary dictionary returned from skyd_find_run # with one extra item if a matching report # was found: # 'list' is set to 'old_runs' or 'new_runs' # reflecting the process list # CALLS: # skyd_find_run # PROCEDURE: # The content of reportdir is picked up. This will include all # report files, and possible some other stuff too. # First the old_runs is checked for a matching report file. # If unsucessfull, the new_runs list is tried. # It is essential that the old_runs list is tried first. # Ideally this list is empty already. If not we want to clean # it out before processing new_runs. # MODIFICATION HISTORY: # DEC-2005, Paul Hick (UCSD/CASS; pphick@ucsd.edu) #- """ global reports_claimed rtn = os.listdir(reportdir) # All files in reportdir reports = [] # Loop over all reports in reportdir. # Omit files that do not start with report_prefix # Omit files that are on the reports_claimed list for report in rtn: # Loop over all files in reportdir if reports_claimed.count(report) != 0: say('skyd_find','I','report','avoiding collision with %s'%report) continue # Omit reports on the reports_claimed list if report.find(report_prefix) != 0: continue # Omit files not starting with report_prefix reports.append(report) rtn = skyd_find_run( reports, old_runs ) if rtn['report'] != '': # Add report to reports_claimed list reports_claimed.append(rtn['report']) rtn['list'] = 'old_runs' return rtn rtn = skyd_find_run( reports, new_runs ) if rtn['report'] != '': # Add report to reports_claimed list reports_claimed.append(rtn['report']) rtn['list'] = 'new_runs' return rtn def skyd_start(grunt,count): """ #+ # NAME: # skyd_start # PURPOSE: # Start indexing runs for specified grunts until the # maximum number of processes are running. # CALLING SEQUENCE: # new_count = skyd_start(grunt,count) # INPUTS: # grunt string name of computer on SMEI subnet # count integer number of mintutes to delay start # of next skyd_orbit run # OUTPUTS: # new_count integer input count, plus number of # new processes started here # PROCEDURE: # All processes marked as 'dead' in new_runs[grunt] # are selected to be launched again. # skyd_orbit is set up to be submitted to the at batch # queue on grunt with a delay of count minutes. # For each process launched count is incremented by one. # The final count value is returned. # # skyd_start is typically run in a loop like this. # # count = 0 # for grunt in grunts: # count = skyd_start(grunt, count) # # As a result skyd_orbit are launched across all grunts # at intervals of roughly 1 minutes. This should reduce the # risk of multiple skyd_orbit runs access the conf file or # the user catalogue at the same time (skyd_orbit actually # provides some defense against this, but better safe than sorry). # MODIFICATION HISTORY: # DEC-2005, Paul Hick (UCSD/CASS; pphick@ucsd.edu) #- """ global new_runs csay = 'skyd_start' if terminator: say(csay,'I','terminator','is set; no new runs started') return count # Processes are marked 'dead', 'start' or 'runs' # Look for 'dead' processes, mark them as 'start' # and build a single cmd with multiple skyd_orbit calls in it. status = 'dead' result = 'start' if grunt == boss: one_cmd = local_cmd else: one_cmd = remote_cmd count_save = count for run in range(len(new_runs[grunt])): if new_runs[grunt][run]['status'] == status: # NamedTemporaryFile actually creates the file # (and deletes it when it closed) # So this also tests that reportdir is writable by boss. report = tempfile.NamedTemporaryFile('w+b',-1,'',report_prefix,reportdir).name new_runs[grunt][run]['report'] = report new_runs[grunt][run]['wmark' ] = os.path.split(report)[1][len(report_prefix):] new_runs[grunt][run]['status'] = result new_runs[grunt][run]['time' ] = time.strftime('%Y_%j_%H%M%S') if count == count_save: cmd = '' else: cmd += '; ' cmd += one_cmd%(report,count) count += 1 print if count == count_save: # Grunt is fully committed say(csay,'I',grunt,'no new process '+result) else: say(csay,'I',grunt,result+' %d process(es)'%(count-count_save)) if grunt != boss: # For execution on a remote machine the command is send over # using ssh. The command is submitted to the local at queue, # because we don't want to wait for the ssh connection to complete # (remember that cmd submits htmd_orbit to the at queue on the # remote machine). cmd = 'echo "ssh '+grunt+' \\"'+cmd+'\\"" | at now' # This goes wrong sometimes with an error message # "interrupted system call". Don't know why. We just # retry a couple of times. If it it just won't work then # set the terminator flag. n = 0 while n < 3: try: os.popen(cmd) except: print cmd say(cSay,'W','failed','system commmand; try again in a sec') n += 1 time.sleep(10) else: break return count def skyd_sighup(signum, frame): """ #+ # NAME: # skyd_sighup # PURPOSE # MODIFICATION HISTORY: # DEC-2005, Paul Hick (UCSD/CASS; pphick@ucsd.edu) #- """ skyd_reload() return def skyd_alarm(signum, frame): """ #+ # NAME: # skyd_alarm # PURPOSE # MODIFICATION HISTORY: # DEC-2005, Paul Hick (UCSD/CASS; pphick@ucsd.edu) #- """ csay = 'skyd_alarm' say(csay,'W','