Commit 24fc893c authored by Doak, Peter W's avatar Doak, Peter W
Browse files

watch_server added to repo

parent db1aed82
log_name: out #whatever you call your 2>&1 out file
dropoff: 20 #minutes since last status change to no longer find log
host: 128.219.185.137 #current CNMS influxdb server
port: 8086 #influx db port
root_dir: /your/root/dir
user_name: your_id
job_prefix: 'abinit' #generally the code
job_suffix: 'condo' #generally the server
influx: True
start: 'ABINIT is a project of the Universite Catholique de Louvain'
header:
'npfft, npband, npspinor and npkpt': [[6, 'npfft'],[7, 'npband'],[8, 'npspinor'],[9, 'npkpt']]
'npkt=': [[2, 'kpoints']]
'natom =': [[3, 'natoms']]
'mband =': [[4, 'nband']]
parse:
# regex: column, tick_name
'ETOT': [[3, 'toten'], [4, 'eacc']]
finish:
- 'JOB DONE'
idle_count: 30
find_sleep: 60
log_name: out
dropoff: 20 #minutes since last status change to no longer find log
host: 128.219.185.137
port: 8086
root_dir: /your/calculation/root/dir
user_name: your id
job_prefix: 'espresso' #generally the code
job_suffix: 'condo' #generally the server
influx: True
start: 'PWSCF.*starts'
init:
'tcpu': 0
header:
'Parallel version (MPI), running on 640 processors': [[6, 'nproc']]
'K-points.*npool': [[5, 'kpar']]
'number of k points': [[5, 'kpoints']]
'number of atoms/cell': [[5, 'natoms']]
'number of Kohn-Sham states': [[5, 'nstates']]
parse:
# regex: column, tick_name
'total cpu time spent': [[9, 'tcpu']]
'total energy': [[4, 'toten']]
'estimated': [[5, 'eacc']]
'Total force': [[4, 'tforce']]
finish:
- 'JOB DONE'
idle_count: 30
find_sleep: 60
37859 curl -i -XPOST http://128.219.185.137:8086/query --data-urlencode "q=CREATE DATABASE \"jobs\""
create datatbabse jobs
select moving_average("value",10) INTO "toten_ma" FROM "toten" GROUP BY "job_name","user_name","code"
select "value" INTO "toten_ma" FROM "toten" GROUP BY "job_name","user_name","code"
import sys
import yaml
import re
import time
import subprocess
import os
import multiprocessing as mp
import socket
import requests
from ws_pygtail import Pygtail
def read_output(log):
ld = config['launch_dir']
print "Watching: {}".format(log)
(directory, out_file) = os.path.split(log)
job_name = os.path.basename(directory)
job_tag = "{}.{}.{}.{}".format(config['user_name'],
config['job_prefix'],
job_name,
config['job_suffix'])
print "job tag: {}".format(job_tag)
sock = socket.create_connection((config['host'],config['port']))
print "Peer Name", sock.getpeername()
print "Sock Name", sock.getsockname()
watch_log = open(os.path.join(ld,"{}.log".format(job_tag)),'w')
watch_log.write("Watching {}\n".format(log))
while True:
offset_file = os.path.join(ld,"{}.off".format(job_tag))
for line in Pygtail(log,
offset_file=offset_file):
for find_this, then_do in config['parse'].iteritems():
match = re.search(find_this, line)
if match:
the_value = (line.split())[then_do[0]-1]
the_value = re.sub(r':',r'',the_value)
print "{} {}".format(then_do[1], the_value)
watch_log.write("{}: {}\n".format(then_do[1], the_value))
sock.sendall("{}.{} {} {}\n".format(job_tag,
then_do[1],
the_value,
int(time.time())))
watch_log.flush()
time.sleep(10)
def read_output_influx(log):
ld = config['launch_dir']
print "Watching: {}".format(log)
(directory, out_file) = os.path.split(log)
job_name = os.path.basename(directory)
job_tag = "{}.{}.{}.{}".format(config['user_name'],
config['job_prefix'],
job_name,
config['job_suffix'])
print "job tag: {}".format(job_tag)
watch_log = open(os.path.join(ld,"{}.log".format(job_tag)),'w')
watch_log.write("Watching {}\n".format(log))
watch_start = ('start' in config)
header_element_found = {x:False for x in config['header']}
while True:
offset_file = os.path.join(ld,"{}.off".format(job_tag))
influx_lines = []
for line in Pygtail(log,
offset_file=offset_file):
if watch_start:
match = re.search(config['start'],line)
if match:
if 'init' in config:
for measurement, value in config['init'].iteritems():
influx_point = ("{},user_name={},code={},"
"job_name={},host={} value={} {}\n"
.format(measurement,
config['user_name'],
config['job_prefix'],
job_name,
config['job_suffix'],
value,
int(time.time())))
influx_lines.append(influx_point)
influx_point = ("header,user_name={},code={},"
"host={} job_name=\"{}\" {}\n"
.format(config['user_name'],
config['job_prefix'],
config['job_suffix'],
job_name,
int(time.time())))
influx_lines.append(influx_point)
if 'header' in config:
for find_this, then_do in config['header'].iteritems():
match = re.search(find_this, line)
if match:
try:
the_value = (line.split())[then_do[0]-1]
print line.split()
the_value = re.sub(r':',r'',the_value)
print "Header -- {}: {}".format(then_do[1], the_value)
influx_point = ("header,user_name={},code={},"
"host={} {}={} {}\n"
.format(config['user_name'],
config['job_prefix'],
config['job_suffix'],
then_do[1],
the_value,
int(time.time())))
influx_lines.append(influx_point)
watch_log.write("Header -- {}: {}\n".format(then_do[1], the_value))
except IndexError as ie:
print "found", find_this, "index error, no column ", ie, " in: "
print line
for find_this, then_do in config['parse'].iteritems():
match = re.search(find_this, line)
if match:
try:
for aindex, label in then_do:
the_value = (line.split())[aindex-1]
the_value = re.sub(r':',r'',the_value)
print "{} {}".format(label, the_value)
influx_point = ("{},user_name={},code={},"
"job_name={},host={} value={} {}\n"
.format(label,
config['user_name'],
config['job_prefix'],
job_name,
config['job_suffix'],
the_value,
int(time.time())))
influx_lines.append(influx_point)
watch_log.write("{}: {}\n".format(label, the_value))
except IndexError as ie:
print "found", find_this, "index error, no column ", ie, " in: "
print line
if len(influx_lines) > 0:
r = requests.post("http://{}:{}/write?db=jobs&precision=s".format(config['host'],config['port']), data="".join(influx_lines))
print r.text
watch_log.flush()
time.sleep(10)
if __name__ == '__main__':
conf_file = open(sys.argv[1],'r')
config = yaml.load(conf_file)
config['launch_dir'] = os.getcwd()
if 'root_dir' in config:
os.chdir(config['root_dir'])
match = re.search('lustre', config['root_dir'])
if match:
config['lustre'] = True
if 'dropoff' in config:
dropoff = "-{}".format(int(config['dropoff']))
else:
dropoff = "-20"
print "CONFIG -- WATCH SERVER"
print config
#Lists used in main loop
watch_list = [] #what is the difference
watch_idlers = {}
log_2_process = {}
while True:
logs = []
if 'lustre' in config:
lustre_logs = subprocess.check_output(['lfs','find', '-ctime', '-1', '-name', config['log_name'], './'])
lustre_logs = lustre_logs.splitlines()
print lustre_logs
for lustre_log in lustre_logs:
mod_seconds = time.time() - os.stat(lustre_log).st_mtime
print time.time(),'-',os.stat(lustre_log).st_ctime, mod_seconds, (-1 * int(dropoff) * 60)
if mod_seconds < -1 * int(dropoff) * 60:
logs.append(lustre_log)
else:
logs = subprocess.check_output(['find', './' ,'-name', config['log_name'], '-cmin', dropoff])
logs = logs.split('\n')
found_activity = []
real_logs = [log for log in logs if len(log) > 1]
for log in real_logs:
if log in log_2_process and log_2_process[log].is_alive():
found_activity.append(log)
if log in watch_idlers:
del watch_idlers[log]
else:
if 'influx' in config:
log_2_process[log] = mp.Process(target=read_output_influx, args=[log])
else:
log_2_process[log] = mp.Process(target=read_output, args=[log])
log_2_process[log].daemon = True
log_2_process[log].start()
found_activity.append(log)
for proc_log in [idle_log for idle_log in log_2_process if idle_log not in found_activity]:
if idle_log in watch_idlers:
watch_idlers[idle_log] += 1
else:
watch_idlers[idle_log] = 1
if watch_idlers[idle_log] > config["idle_count"]:
del watch_idlers[idle_log]
log_2_process[log].terminate()
del log_2_process[log]
#delete offset file here
print "sleeping..."
time.sleep(config['find_sleep'])
log_name: OUTCAR
dropoff: 20 #minutes since last status change to no longer find log
host: 128.219.185.137
port: 8086
root_dir: /lustre/or-hydra/cades-cnms/your_directory
user_name: your_name
influx: True
job_prefix: 'vasp' #generally the code
job_suffix: 'condo' #generally the server
start: 'executed on'
header:
'running on': [[3, 'nproc']]
'distrk': [[7, 'kpar']]
'distr': [[6, 'ncores_per_band']]
'NKPTS': [[4, 'n_kpts']]
'NIONS': [[11, 'n_ions']]
parse:
# regex: column, tick_name
'LOOP:': [[4, 'loop']]
'LOOP\+:': [[4, 'loop_plus']]
'TOTEN': [[5, 'toten']]
finish:
- 'General timing and accounting'
idle_count: 30 #number of minutes after dropoff to kill a log watcher process
find_sleep: 60
#!/usr/bin/python -tt
# -*- coding: utf-8 -*-
# pygtail - a python "port" of logtail2
# Copyright (C) 2011 Brad Greenlee <brad@footle.org>
#
# Derived from logcheck <http://logcheck.org>
# Copyright (C) 2003 Jonathan Middleton <jjm@ixtab.org.uk>
# Copyright (C) 2001 Paul Slootman <paul@debian.org>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
from __future__ import print_function
from os import stat
from os.path import exists, getsize
import sys
import glob
import gzip
from optparse import OptionParser
__version__ = '0.7.0'
PY3 = sys.version_info[0] == 3
if PY3:
text_type = str
else:
text_type = unicode
def force_text(s, encoding='utf-8', errors='strict'):
if isinstance(s, text_type):
return s
return s.decode(encoding, errors)
class Pygtail(object):
"""
Creates an iterable object that returns only unread lines.
Keyword arguments:
offset_file File to which offset data is written (default: <logfile>.offset).
paranoid Update the offset file every time we read a line (as opposed to
only when we reach the end of the file (default: False))
every_n Update the offset file every n'th line (as opposed to only when
we reach the end of the file (default: 0))
on_update Execute this function when offset data is written (default False)
copytruncate Support copytruncate-style log rotation (default: True)
"""
def __init__(self, filename, offset_file=None, paranoid=False, copytruncate=True,
every_n=0, on_update=False):
self.filename = filename
self.paranoid = paranoid
self.every_n = every_n
self.on_update = on_update
self.copytruncate = copytruncate
self._offset_file = offset_file or "%s.offset" % self.filename
self._offset_file_inode = 0
self._offset = 0
self._since_update = 0
self._fh = None
self._rotated_logfile = None
# if offset file exists and non-empty, open and parse it
if exists(self._offset_file) and getsize(self._offset_file):
offset_fh = open(self._offset_file, "r")
(self._offset_file_inode, self._offset) = \
[int(line.strip()) for line in offset_fh]
offset_fh.close()
if self._offset_file_inode != stat(self.filename).st_ino or \
stat(self.filename).st_size < self._offset:
# The inode has changed or filesize has reduced so the file
# might have been rotated.
# Look for the rotated file and process that if we find it.
self._rotated_logfile = self._determine_rotated_logfile()
def __del__(self):
if self._filehandle():
self._filehandle().close()
def __iter__(self):
return self
def next(self):
"""
Return the next line in the file, updating the offset.
"""
try:
line = self._get_next_line()
except StopIteration:
# we've reached the end of the file; if we're processing the
# rotated log file, we can continue with the actual file; otherwise
# update the offset file
if self._rotated_logfile:
self._rotated_logfile = None
self._fh.close()
self._offset = 0
# open up current logfile and continue
try:
line = self._get_next_line()
except StopIteration: # oops, empty file
self._update_offset_file()
raise
else:
self._update_offset_file()
raise
if self.paranoid:
self._update_offset_file()
elif self.every_n and self.every_n <= self._since_update:
self._update_offset_file()
return line
def __next__(self):
"""`__next__` is the Python 3 version of `next`"""
return self.next()
def readlines(self):
"""
Read in all unread lines and return them as a list.
"""
return [line for line in self]
def read(self):
"""
Read in all unread lines and return them as a single string.
"""
lines = self.readlines()
if lines:
try:
return ''.join(lines)
except TypeError:
return ''.join(force_text(line) for line in lines)
else:
return None
def _is_closed(self):
if not self._fh:
return True
try:
return self._fh.closed
except AttributeError:
if isinstance(self._fh, gzip.GzipFile):
# python 2.6
return self._fh.fileobj is None
else:
raise
def _filehandle(self):
"""
Return a filehandle to the file being tailed, with the position set
to the current offset.
"""
if not self._fh or self._is_closed():
filename = self._rotated_logfile or self.filename
if filename.endswith('.gz'):
self._fh = gzip.open(filename, 'r')
else:
self._fh = open(filename, "r", 1)
self._fh.seek(self._offset)
return self._fh
def _update_offset_file(self):
"""
Update the offset file with the current inode and offset.
"""
if self.on_update:
self.on_update()
offset = self._filehandle().tell()
inode = stat(self.filename).st_ino
fh = open(self._offset_file, "w")
fh.write("%s\n%s\n" % (inode, offset))
fh.close()
self._since_update = 0
def _determine_rotated_logfile(self):
"""
We suspect the logfile has been rotated, so try to guess what the
rotated filename is, and return it.
"""
rotated_filename = self._check_rotated_filename_candidates()
if rotated_filename and exists(rotated_filename):
if stat(rotated_filename).st_ino == self._offset_file_inode:
return rotated_filename
# if the inode hasn't changed, then the file shrank; this is expected with copytruncate,
# otherwise print a warning
if stat(self.filename).st_ino == self._offset_file_inode:
if self.copytruncate:
return rotated_filename
else:
sys.stderr.write(
"[pygtail] [WARN] file size of %s shrank, and copytruncate support is "
"disabled (expected at least %d bytes, was %d bytes).\n" %
(self.filename, self._offset, stat(self.filename).st_size))
return None
def _check_rotated_filename_candidates(self):
"""
Check for various rotated logfile filename patterns and return the first
match we find.
"""
# savelog(8)
candidate = "%s.0" % self.filename
if (exists(candidate) and exists("%s.1.gz" % self.filename) and
(stat(candidate).st_mtime > stat("%s.1.gz" % self.filename).st_mtime)):
return candidate
# logrotate(8)
# with delaycompress
candidate = "%s.1" % self.filename
if exists(candidate):
return candidate
# without delaycompress
candidate = "%s.1.gz" % self.filename
if exists(candidate):
return candidate
rotated_filename_patterns = (
# logrotate dateext rotation scheme - `dateformat -%Y%m%d` + with `delaycompress`
"-[0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9]",
# logrotate dateext rotation scheme - `dateformat -%Y%m%d` + without `delaycompress`
"-[0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9].gz",
# logrotate dateext rotation scheme - `dateformat -%Y%m%d-%s` + with `delaycompress`
"-[0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9]-[0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9]",
# logrotate dateext rotation scheme - `dateformat -%Y%m%d-%s` + without `delaycompress`
"-[0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9]-[0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9].gz",
# for TimedRotatingFileHandler
".[0-9][0-9][0-9][0-9]-[0-9][0-9]-[0-9][0-9]",
# chainRename pattern
"_[0-9][0-9]"
)
for rotated_filename_pattern in rotated_filename_patterns:
candidates = glob.glob(self.filename + rotated_filename_pattern)
if candidates:
candidates.sort()
return candidates[-1] # return most recent
# no match
return None
def _get_next_line(self):
line = self._filehandle().readline()
if not line:
raise StopIteration
self._since_update += 1
return line
def main():
# command-line parsing
cmdline = OptionParser(usage="usage: %prog [options] logfile",
description="Print log file lines that have not been read.")
cmdline.add_option("--offset-file", "-o", action="store",
help="File to which offset data is written (default: <logfile>.offset).")
cmdline.add_option("--paranoid", "-p", action="store_true",
help="Update the offset file every time we read a line (as opposed to"
" only when we reach the end of the file).")
cmdline.add_option("--every-n", "-n", action="store",
help="Update the offset file every n'th time we read a line (as opposed to"
" only when we reach the end of the file).")
cmdline.add_option("--no-copytruncate", action="store_true",
help="Don't support copytruncate-style log rotation. Instead, if the log file"
" shrinks, print a warning.")
cmdline.add_option("--version", action="store_true",
help="Print version and exit.")
options, args = cmdline.parse_args()
if options.version:
print("pygtail version", __version__)
sys.exit(0)
if (len(args) != 1):
cmdline.error("Please provide a logfile to read.")
if options.every_n:
options.every_n = int(options.every_n)