Commit 030c354d authored by Doak, Peter W's avatar Doak, Peter W
Browse files

actual watchserver work

parent 977033a5
#So as I worked on finding the correct create_server call I was deleting instances with the horizon gui.
cloud.create_server('auto_ws', image=u'CADES_Ubuntu16.04_v20170804_1', flavor=u'm1.small', wait=True,network=u'or_provider_general_extnetwork1', auto_ip=True)
cloud.create_volume(8, wait=True, image=u'CADES_Ubuntu16.04_v20170804_1', name=u'ws_auto_vol')
cloud.create_server('auto_ws', volume=u'ws_auto_vol', flavor=u'm1.small', wait=True,network=u'or_provider_general_extnetwork1', auto_ip=True)
1096/14: cloud.create_server('auto_ws', volumes=[u'ws_auto_vol'], flavor=u'm1.small', wait=True,network=u'or_provider_general_extnetwork1', boot_volume='ws_auto_vol', auto_ip=True)
1096/15: cloud.create_server('auto_ws', image=u'CADES_Ubuntu16.04_v20170804_1', flavor=u'm1.small', volumes=[u'ws_auto_vol'], wait=True,network=u'or_provider_general_extnetwork1', boot_volume='ws_auto_vol', auto_ip=True)
1096/16: cloud.create_server('auto_ws', image=u'ws_auto_vol', flavor=u'm1.small', volumes=[u'ws_auto_vol'], wait=True,network=u'or_provider_general_extnetwork1', boot_volume='ws_auto_vol', auto_ip=True)
cloud.delete_volume(u'ws_auto_vol')
cloud.get_volume(u'ws_auto_vol')
cloud.list_servers()
1096/21: [s['name'] for s in cloud.list_servers()]
1096/22: cloud.get_volume(u'ws_auto_vol')
1096/23: cloud.delete_volume(u'ws_auto_vol', force=True)
1096/24: cloud.create_server('auto_ws', image=u'CADES_Ubuntu16.04_v20170804_1', flavor=u'm1.small', wait=True,network=u'or_provider_general_extnetwork1', boot_from_volume=True, boot_volume='ws_auto_vol', auto_ip=True)
#This turns out to be the way to make an instance from an image with a new volume from the image
cloud.create_server('auto_ws', image=u'CADES_Ubuntu16.04_v20170804_1', flavor=u'm1.small', wait=True,network=u'or_provider_general_extnetwork1', boot_from_volume=True, auto_ip=True)
import sys
import yaml
import re
import time
import subprocess
import os
import multiprocessing as mp
import socket
import requests
from ws_pygtail import Pygtail
@syncio.coroutine
def read_output_influx(log):
ld = config['launch_dir']
print "Watching: {}".format(log)
(directory, out_file) = os.path.split(log)
job_name = os.path.basename(directory)
job_tag = "{}.{}.{}.{}".format(config['user_name'],
config['job_prefix'],
job_name,
config['job_suffix'])
print "job tag: {}".format(job_tag)
watch_log = open(os.path.join(ld,"{}.log".format(job_tag)),'w')
watch_log.write("Watching {}\n".format(log))
watch_start = ('start' in config)
while True:
offset_file = os.path.join(ld,"{}.off".format(job_tag))
influx_lines = []
for line in Pygtail(log,
offset_file=offset_file):
if watch_start:
match = re.search(config['start'],line)
if match:
if 'init' in config:
for measurement, value in config['init'].iteritems():
influx_point = ("{},user_name={},code={},"
"job_name={},host={} value={} {}\n"
.format(measurement,
config['user_name'],
config['job_prefix'],
job_name,
config['job_suffix'],
value,
int(time.time())))
influx_lines.append(influx_point)
influx_point = ("header,user_name={},code={},"
"host={} job_name=\"{}\" {}\n"
.format(config['user_name'],
config['job_prefix'],
config['job_suffix'],
job_name,
int(time.time())))
influx_lines.append(influx_point)
if 'header' in config:
for find_this, then_dos in config['header'].iteritems():
match = re.search(find_this, line)
if match:
try:
for then_do in then_dos:
the_value = (line.split())[then_do[0]-1]
print line.split()
the_value = re.sub(r':',r'',the_value)
print "Header -- {}: {}".format(then_do[1], the_value)
influx_point = ("header,user_name={},code={},"
"host={} {}={} {}\n"
.format(config['user_name'],
config['job_prefix'],
config['job_suffix'],
then_do[1],
the_value,
int(time.time())))
influx_lines.append(influx_point)
watch_log.write("Header -- {}: {}\n".format(then_do[1], the_value))
except IndexError as ie:
print "found", find_this, "index error, no column ", ie, " in: "
print line
for find_this, then_do in config['parse'].iteritems():
match = re.search(find_this, line)
if match:
try:
for aindex, label in then_do:
the_value = (line.split())[aindex-1]
the_value = re.sub(r':',r'',the_value)
#case where vasp runs force into its error value
fmatch = re.search(r'([-\dEe\.]+)\[.*', the_value)
if fmatch:
the_value = fmatch.group(1)
print "{} {}".format(label, the_value)
influx_point = ("{},user_name={},code={},"
"job_name={},host={} value={} {}\n"
.format(label,
config['user_name'],
config['job_prefix'],
job_name,
config['job_suffix'],
the_value,
int(time.time())))
influx_lines.append(influx_point)
watch_log.write("{}: {}\n".format(label, the_value))
except IndexError as ie:
print "found", find_this, "index error, no column ", ie, " in: "
print line
if len(influx_lines) > 0:
r = requests.post("http://{}:{}/write?db=jobs&precision=s".format(config['host'],config['port']), data="".join(influx_lines))
print r.text
watch_log.flush()
time.sleep(10)
if __name__ == '__main__':
conf_file = open(sys.argv[1],'r')
config = yaml.load(conf_file)
config['launch_dir'] = os.getcwd()
if 'root_dir' in config:
os.chdir(config['root_dir'])
match = re.search('lustre', config['root_dir'])
if match:
config['lustre'] = True
if 'dropoff' in config:
dropoff = "-{}".format(int(config['dropoff']))
else:
dropoff = "-20"
print "CONFIG -- WATCH SERVER"
print config
#Lists used in main loop
watch_list = [] #what is the difference
watch_idlers = {}
log_2_process = {}
while True:
logs = []
if 'lustre' in config:
lustre_logs = subprocess.check_output(['lfs','find', '-ctime', '-1', '-name', config['log_name'], './'])
lustre_logs = lustre_logs.splitlines()
print lustre_logs
for lustre_log in lustre_logs:
mod_seconds = time.time() - os.stat(lustre_log).st_mtime
print time.time(),'-',os.stat(lustre_log).st_ctime, mod_seconds, (-1 * int(dropoff) * 60)
if mod_seconds < -1 * int(dropoff) * 60:
logs.append(lustre_log)
else:
logs = subprocess.check_output(['find', './' ,'-name', config['log_name'], '-cmin', dropoff])
logs = logs.split('\n')
found_activity = []
real_logs = [log for log in logs if len(log) > 1]
for log in real_logs:
if log in log_2_process and log_2_process[log].is_alive():
found_activity.append(log)
if log in watch_idlers:
del watch_idlers[log]
else:
if 'influx' in config:
log_2_process[log] = mp.Process(target=read_output_influx, args=[log])
else:
log_2_process[log] = mp.Process(target=read_output, args=[log])
log_2_process[log].daemon = True
log_2_process[log].start()
found_activity.append(log)
for proc_log in [idle_log for idle_log in log_2_process if idle_log not in found_activity]:
if idle_log in watch_idlers:
watch_idlers[idle_log] += 1
else:
watch_idlers[idle_log] = 1
if watch_idlers[idle_log] > config["idle_count"]:
del watch_idlers[idle_log]
log_2_process[log].terminate()
del log_2_process[idle_log]
#os.remove("{}.off".format(idle_log))
#can't do this unless we can get the reader to tell us what the log_tag is
#delete offset file here
print "sleeping..."
time.sleep(config['find_sleep'])
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment