# pylint: disable=W0102,W0212
import os
import sys
import copy
import tarfile
import pickle as pickle
import more_itertools as mit
import radical.utils as ru
from .entity import Entity
# ------------------------------------------------------------------------------
#
[docs]class Session(object):
# --------------------------------------------------------------------------
#
[docs] def __init__(self, src, stype, sid=None, _entities=None, _init=True):
'''
Create a radical.analytics session for analysis.
The session is created from a set of traces, which usually have been
produced by a Session object in the RCT stack, such as radical.pilot or
radical.entk. Profiles are accepted in two forms: in a directory, or in
a tarball (of such a directory). In the latter case, the tarball are
extracted into `$TMP`, and then handled just as the directory case.
If no `sid` (session ID) is specified, that ID is derived from the
directory name.
'''
if _init:
# if no sid is given, derive it from the src path
sid, src, tgt, ext = self._get_sid(sid, src)
else:
assert sid
assert src
tgt = None
self._sid = sid
self._src = src
self._stype = stype
if tgt and not os.path.exists(tgt):
# need to extract
print(('extract tarball to %s' % tgt))
try:
if ext in ['tbz', 'tar.bz', 'tbz2', 'tar.bz2']:
tf = tarfile.open(name=src, mode='r:bz2')
tf.extractall(path=os.path.dirname(tgt))
elif ext in ['tgz, tar.gz']:
tf = tarfile.open(name=src, mode='r:gz')
tf.extractall(path=os.path.dirname(tgt))
else:
raise ValueError('cannot handle extension %s' % ext)
except Exception as e:
raise RuntimeError(
'Cannot extract tarball: %s' % repr(e)) from e
if tgt:
self._src = tgt
# print 'sid: %s [%s]' % (sid, stype)
# print 'src: %s' % src
if stype == 'radical':
# src is expected to point either to a single profile, or to
# a directory tree containing profiles
if not src:
raise ValueError('RA session types need `src` specified')
profiles = list()
if os.path.isfile(src):
profiles.append(src)
else:
def _walkdir(path, profiles=[]):
for root, dirs, files in os.walk(path):
for f in files:
if f.endswith('.prof'):
profiles.append('%s/%s' % (root, f))
for d in dirs:
_walkdir('%s/%s' % (path, d), profiles)
return profiles
profiles = _walkdir(src)
profiles = ru.read_profiles(profiles, sid=sid)
self._profile, accuracy = ru.combine_profiles(profiles)
self._description = {'tree' : dict(),
'entities' : list(),
'hostmap' : dict(),
'accuracy' : 0.0}
elif stype == 'radical.pilot':
try:
import radical.pilot.utils as rpu
except Exception as e:
raise RuntimeError('radical.analytics requires the '
'radical.pilot module to analyze this '
'session - please install it.') from e
self._profile, accuracy, hostmap = \
rpu.get_session_profile(sid=sid, src=self._src)
self._description = \
rpu.get_session_description(sid=sid, src=self._src)
self._description['accuracy'] = accuracy
self._description['hostmap'] = hostmap
elif stype == 'radical.entk':
try:
import radical.entk.utils as reu
except Exception as e:
raise RuntimeError('radical.analytics requires the '
'radical.entk module to analyze this '
'session - please install it.') from e
self._profile, accuracy, hostmap \
= reu.get_session_profile (sid=sid, src=self._src)
self._description = reu.get_session_description(sid=sid, src=self._src)
self._description['accuracy'] = accuracy
self._description['hostmap'] = hostmap
else:
raise ValueError('unsupported session type [%s]' % stype)
self._t_start = None
self._t_stop = None
self._ttc = None
self._log = ru.Logger('radical.analytics')
self._rep = ru.Reporter('radical.analytics')
# user defined time offset
self._tzero = 0.0
# internal state is represented by a dict of entities:
# dict keys are entity uids (which are assumed to be unique per
# session), dict values are ra.Entity instances.
self._entities = dict()
if _init:
self._initialize_entities(self._profile)
# we do some bookkeeping in self._properties where we keep a list of
# property values around which we encountered in self._entities.
self._properties = dict()
if _init:
self._initialize_properties()
# print('session loaded')
# FIXME: we should do a sanity check that all encountered states and
# events are part of the respective state and event models
# self.consistency()
# --------------------------------------------------------------------------
#
@staticmethod
def _get_sid(sid, src):
tgt = None
ext = None
if not os.path.exists(src):
raise ValueError('src [%s] does not exist' % src)
if os.path.isdir(src):
pass
elif os.path.isfile(src):
# src is a file - we assume its a tarball and extract it
if src.endswith('.prof'):
# use as is
tgt = src
elif src.endswith('.tgz') or \
src.endswith('.tbz') :
tgt = src[:-4]
ext = src[-3:]
elif src.endswith('.tbz2'):
tgt = src[:-5]
ext = src[-4:]
elif src.endswith('.tar.gz') or \
src.endswith('.tar.bz') :
tgt = src[:-7]
ext = src[-6:]
elif src.endswith('.tar.bz2'):
tgt = src[:-8]
ext = src[-7:]
elif src.endswith('.prof'):
tgt = None
ext = None
else:
raise ValueError('src does not look like a tarball or profile')
if not sid:
if tgt: to_check = tgt
else : to_check = src
if to_check.endswith('/'):
to_check = src[:-1]
sid = os.path.basename(to_check)
return sid, src, tgt, ext
# --------------------------------------------------------------------------
#
def __getstate__(self):
state = {
'sid' : self._sid,
'src' : self._src,
'stype' : self._stype,
'profile' : self._profile,
'description' : self._description,
't_start' : self._t_start,
't_stop' : self._t_stop,
'ttc' : self._ttc,
'entities' : self._entities,
'properties' : self._properties,
}
return state
# --------------------------------------------------------------------------
#
def __setstate__(self, state):
self._sid = state['sid']
self._src = state['src']
self._stype = state['stype']
self._profile = state['profile']
self._description = state['description']
self._t_start = state['t_start']
self._t_stop = state['t_stop']
self._ttc = state['ttc']
self._entities = state['entities']
self._properties = state['properties']
self._log = ru.Logger('radical.analytics')
self._rep = ru.Reporter('radical.analytics')
# --------------------------------------------------------------------------
#
@staticmethod
def create(src, stype, sid=None, _entities=None, _init=True, cache=True):
sid, src, _, _ = Session._get_sid(sid, src)
base = ru.get_radical_base('radical.analytics.cache')
cache = '%s/%s.pickle' % (base, sid)
if _entities or not cache:
# no caching
session = Session(src, stype, sid, _entities, _init)
try:
with open(cache, 'rb') as fin:
data = fin.read()
session = pickle.loads(data)
print(('using cache for %s' % sid))
# import pprint
# j = ru.read_json("%s/%s.json" % (src, sid))
# rd = j['pilot'][0]['resource_details']
# session.get(etype='pilot')[0].cfg['resource_details'] = rd
# pprint.pprint(session.get(etype='pilot')[0].cfg)
# with open(cache, 'wb') as fout:
# # session = Session(src, stype, sid, _entities, _init)
# fout.write(pickle.dumps(session, protocol=pickle.HIGHEST_PROTOCOL))
except Exception:
print(('no cache for %s' % sid))
with open(cache, 'wb') as fout:
session = Session(src, stype, sid, _entities, _init)
fout.write(pickle.dumps(session, protocol=pickle.HIGHEST_PROTOCOL))
return session
# --------------------------------------------------------------------------
#
def __deepcopy___(self, memo):
cls = self.__class__
ret = cls(sid=self._sid, src=None, stype=self._stype, _init=False)
memo[id(self)] = ret
for k, v in list(self.__dict__.items()):
setattr(ret, k, copy.deepcopy(v, memo))
return ret
# --------------------------------------------------------------------------
#
def _reinit(self, entities):
'''
After creating a session clone, we have identical sets of descriptions,
profiles, and entities. However, if we apply a filter during the clone
creation, we end up with a deep copy which should have a **different**
set of entities. This method applies that new entity set to such a
cloned session.
'''
self._entities = entities
# FIXME: we may want to filter the session description etc. wrt. to the
# entity types remaining after a filter.
# --------------------------------------------------------------------------
#
@property
def t_start(self):
return self._t_start
@property
def t_stop(self):
return self._t_stop
@property
def ttc(self):
return self._ttc
@property
def t_range(self):
return [self._t_start, self._t_stop]
@property
def uid(self):
return self._sid
@property
def stype(self):
return self._stype
# --------------------------------------------------------------------------
#
def _initialize_entities(self, profile):
'''
Populates self._entities from profile and self._description.
NOTE: We derive entity types via some heuristics for now: we assume the
first part of any dot-separated uid to signify an entity type.
'''
# create entities from the profile events:
entity_events = dict()
for event in profile:
if event[ru.TIME] < -1: # allow fow 1sec rounding error
raise ValueError('invalid time stamp: %s' % event)
uid = event[ru.UID]
if uid not in entity_events:
entity_events[uid] = list()
entity_events[uid].append(event)
# for all uids found, create and store an entity. We look up the
# entity type in one of the events (and assume it is consistent over
# all events for that uid)
for uid,events in list(entity_events.items()):
details = self._description.get('tree', dict()).get(uid, dict())
# hostid should be handled on RP level
hostid = self._description.get('hostmap', dict()).get(uid)
if hostid:
details['hostid'] = hostid
self._entities[uid] = Entity(_uid=uid,
_profile=events,
_details=details)
# --------------------------------------------------------------------------
#
def _initialize_properties(self):
'''
populate `self._properties` from `self._entities`. `self._properties`∫
has the following format::
{
'state' : {
'NEW' : 10,
'RUNNING' : 8,
'DONE : 7,
'FAILED' : 1,
'CANCELED' : 2
}
}
We count how often any property value appears in the current set of
entities.
RA knows exactly 4 properties:
- uid (entity identifiers)
- etype (type of entities)
- event (names of events)
- state (state identifiers)
'''
# FIXME: initializing properties can be expensive, and we might not
# always need them anyway. So we can lazily defer this
# initialization stop until the first query which requires them.
# we do *not* look at profile and descriptions anymore, those are only
# evaluated once on construction, in `_initialize_entities()`. Now we
# don't parse all that stuff again, but only re-initialize after
# in-place filtering etc.
self._properties = {'uid' : dict(),
'etype' : dict(),
'event' : dict(),
'state' : dict()}
if self._entities:
self._t_start = sys.float_info.max
self._t_stop = sys.float_info.min
for euid,e in list(self._entities.items()):
self._t_start = min(self._t_start, e.t_start)
self._t_stop = max(self._t_stop, e.t_stop )
if euid in self._properties['uid']:
raise RuntimeError('duplicated uid %s' % euid)
self._properties['uid'][euid] = 1
if e.etype not in self._properties['etype']:
self._properties['etype'][e.etype] = 0
self._properties['etype'][e.etype] += 1
for state in e.states:
if state not in self._properties['state']:
self._properties['state'][state] = 0
self._properties['state'][state] += 1
for event in e.events:
name = event[ru.EVENT]
if name not in self._properties['event']:
self._properties['event'][name] = 0
self._properties['event'][name] += 1
if self._entities:
self._ttc = self._t_stop - self._t_start
# --------------------------------------------------------------------------
#
def _apply_filter(self, etype=None, uid=None, state=None,
event=None, time=None):
# iterate through all self._entities and collect UIDs of all entities
# which match the given set of filters (after removing all events which
# are not in the given time ranges)
etype = ru.as_list(etype)
uid = ru.as_list(uid )
state = ru.as_list(state)
event = ru.as_list(event)
time = ru.as_list(time )
ret = list()
for eid,entity in list(self._entities.items()):
if etype and entity.etype not in etype: continue
if uid and entity.uid not in uid : continue
if state:
match = False
for s,stuple in list(entity.states.items()):
if time and not ru.in_range(stuple[ru.TIME], time):
continue
if s in state:
match = True
break
if not match:
continue
if event:
match = False
for etuple in entity.events:
if time and not ru.in_range(etuple[ru.TIME], time):
continue
if etuple[ru.EVENT] in event:
match = True
break
if not match:
continue
# all existing filters have been passed - this is a match!
ret.append(eid)
return ret
# --------------------------------------------------------------------------
#
def _dump(self):
for uid,entity in list(self._entities.items()):
print(('\n\n === %s' % uid))
entity.dump()
for event in entity.events:
print((' = %s' % event))
for e in entity.events[event]:
print((' %s' % e))
# --------------------------------------------------------------------------
#
def list(self, pname=None):
if not pname:
# return the name of all known properties
return list(self._properties.keys())
if isinstance(pname, list):
return_list = True
pnames = pname
else:
return_list = False
pnames = [pname]
ret = list()
for _pname in pnames:
if _pname not in self._properties:
raise KeyError('no such property known (%s) / %s'
% (_pname, list(self._properties.keys())))
ret.append(list(self._properties[_pname].keys()))
if return_list: return ret
else : return ret[0]
# --------------------------------------------------------------------------
#
def get(self, etype=None, uid=None, state=None, event=None, time=None):
uids = self._apply_filter(etype=etype, uid=uid, state=state,
event=event, time=time)
return [self._entities[_uid] for _uid in uids]
# --------------------------------------------------------------------------
#
def filter(self, etype=None, uid=None, state=None, event=None, time=None,
inplace=True):
uids = self._apply_filter(etype=etype, uid=uid, state=state,
event=event, time=time)
if inplace:
# filter our own entity list, and refresh the entity based on
# the new list
if uids != list(self._entities.keys()):
self._entities = {uid:self._entities[uid] for uid in uids}
self._initialize_properties()
return self
else:
# create a new session with the resulting entity list
ret = Session(sid=self._sid, stype=self._stype, src=self._src,
_init=False)
ret._reinit(entities={uid:self._entities[uid] for uid in uids})
ret._initialize_properties()
return ret
# --------------------------------------------------------------------------
#
def describe(self, mode=None, etype=None):
if mode not in [None, 'state_model', 'state_values',
'event_model', 'relations',
'statistics']:
raise ValueError('describe parameter "mode" invalid')
if not etype and not mode:
# no entity filter applied: return the full description
return self._description
if mode == 'statistics':
return self._properties
if not etype:
etype = self.list('etype')
if not isinstance(etype,list):
etype = [etype]
ret = dict()
for et in etype:
state_model = None
state_values = None
event_model = None
if et in self._description.get('entities', dict()):
state_model = self._description['entities'][et]['state_model']
state_values = self._description['entities'][et]['state_values']
event_model = self._description['entities'][et]['event_model']
if not state_model : state_model = dict()
if not state_values : state_values = dict()
if not event_model : event_model = dict()
if not mode:
ret[et] = {'state_model' : state_model,
'state_values' : state_values,
'event_model' : event_model}
elif mode == 'state_model':
ret[et] = {'state_model' : state_model}
elif mode == 'state_values':
ret[et] = {'state_values' : state_values}
elif mode == 'event_model':
ret[et] = {'event_model' : event_model}
if not mode or mode == 'relations':
if len(etype) != 2:
raise ValueError('relations expect an etype *tuple*')
# we interpret the query as follows: for the two given etypes, walk
# through the relationship tree and for all entities of etype[0]
# return a list of all child entities of etype[1]. The result is
# returned as a dict.
parent_uids = self._apply_filter(etype=etype[0])
child_uids = self._apply_filter(etype=etype[1])
rel = self._description.get('tree', dict())
for p in parent_uids:
ret[p] = list()
if p not in rel:
print(('inconsistent : no relations for %s' % p))
continue
for c in rel[p]['children']:
if c in child_uids:
ret[p].append(c)
return ret
# --------------------------------------------------------------------------
#
[docs] def ranges(self, state=None, event=None, time=None, collapse=True):
'''
Gets a set of initial and final conditions, and computes time ranges in
accordance to those conditions from all session entities. The resulting
set of ranges is then collapsed to the minimal equivalent set of ranges
covering the same set of times.
Please refer to the :class:`Entity.ranges` documentation on detail on
the constrain parameters.
Setting 'collapse' to 'True' (default) will prompt the method to
collapse the resulting set of ranges.
'''
ranges = list()
for uid,entity in list(self._entities.items()):
try:
ranges += entity.ranges(state, event, time, collapse=False)
except ValueError:
print(('no ranges for %s' % uid))
# ignore entities for which the conditions did not apply
# pass
if not ranges:
return []
if collapse:
ret = ru.collapse_ranges(ranges)
else:
ret = ranges
# sort ranges by start time and return
return sorted(ret, key=lambda r: r[1])
# --------------------------------------------------------------------------
#
[docs] def timestamps(self, state=None, event=None, time=None, first=False):
'''
This method accepts a set of conditions, and returns the list of
timestamps for which those conditions applied, i.e., for which state
transitions or events are known which match the given 'state' or 'event'
parameter. If no match is found, an empty list is returned.
Both `state` and `event` can be lists, in which case the union of all
timestamps are returned.
The `time` parameter is expected to be a single tuple, or a list of
tuples, each defining a pair of start and end time which are used to
constrain the matching timestamps.
If `first` is set to `True`, only the timestamps for the first matching
events (per entity) are returned.
The returned list will be sorted.
'''
ret = list()
for _,entity in list(self._entities.items()):
tmp = entity.timestamps(state=state, event=event, time=time)
if tmp and first:
ret.append(tmp[0])
else:
ret += tmp
return sorted(ret)
# --------------------------------------------------------------------------
#
[docs] def duration(self, state=None, event=None, time=None, ranges=None):
'''
This method accepts the same set of parameters as the `ranges()` method,
and will use the `ranges()` method to obtain a set of ranges. It will
return the sum of the durations for all resulting & collapsed ranges.
Example::
session.duration(state=[rp.NEW, rp.FINAL]))
where `rp.FINAL` is a list of final task states.
'''
if not ranges:
ranges = self.ranges(state, event, time)
else:
assert not state
assert not event
assert not time
# make sure the ranges are collapsed (although they likely are
# already...)
ranges = ru.collapse_ranges(ranges)
return sum(r[1] - r[0] for r in ranges)
# --------------------------------------------------------------------------
#
[docs] def concurrency(self, state=None, event=None, time=None, sampling=None):
'''
This method accepts the same set of parameters as the `ranges()` method,
and will use the `ranges()` method to obtain a set of ranges. It will
return a time series, counting the number of tasks which are
concurrently matching the ranges filter at any point in time.
The additional parameter `sampling` determines the exact points in time
for which the concurrency is computed, and thus determines the sampling
rate for the returned time series. If not specified, the time series
will contain all points at which the concurrency changed. If specified,
it is interpreted as second (float) interval at which, after the
starting point (begin of first event matching the filters) the
concurrency is computed.
Returned is an ordered list of tuples::
[ [time_0, concurrency_0],
[time_1, concurrency_1],
...
[time_n, concurrency_n] ]
where `time_n` is represented as `float`, and `concurrency_n` as `int`.
Example::
session.filter(etype='task').concurrency(state=[rp.AGENT_EXECUTING,
rp.AGENT_STAGING_OUTPUT_PENDING])
'''
INC = 1 # increase concurrency
DEC = -1 # decrease concurrency
ranges = list()
for _,e in list(self._entities.items()):
ranges += e.ranges(state, event, time)
if not ranges:
return []
times = list()
# get all start and end times for all ranges. The start of a range will
# increase concurrency by one at that time stamp, and the end will
# decrease it by one again.
for r in ranges:
times.append([r[0], INC])
times.append([r[1], DEC])
# sort those times
times.sort()
# get the overall concurrency data
conc = 0
data = list() # [[time, concurrency], ...]
for time, val in times:
conc += val
data.append([time, conc])
# collapse time stamps (use last value on same time stamps)
collapsed = list()
last = data[0]
for time, val in data[1:]:
if time != last[0]:
collapsed.append(last)
last = [time, val]
# append last time
collapsed.append(last)
# make sure we start at zero (at time of first event)
collapsed.insert(0, [collapsed[0][0], 0])
if not sampling:
# return as is
ret = collapsed
else:
# select data points according to sampling
# get min time, and create timestamps at regular intervals
t = times[0][0]
ret = list()
for time, val in collapsed:
while time >= t:
ret.append([t, val])
t += sampling
# append last time stamp if it is not appended, yet
if ret[-1] != [t, val]:
ret.append([t, val])
return ret
# --------------------------------------------------------------------------
#
[docs] def rate(self, state=None, event=None, time=None, sampling=None,
first=False):
'''
This method accepts the same parameters as the `timestamps()` method: it
will count all matching events and state transitions as given, and will
return a time series of the rate of how many of those events and/or
transitions occurred per second.
The additional parameter `sampling` determines the exact points in time
for which the rate is computed, and thus determines the sampling rate
for the returned time series. If not specified, the time series will
contain all points at which and event occurred, and the rate value will
only be determined by the time passed between two consecutive events.
If specified, it is interpreted as second (float) interval at which,
after the starting point (begin of first event matching the filters) the
rate is computed.
Returned is an ordered list of tuples::
[ [time_0, rate_0] ,
[time_1, rate_1] ,
...
[time_n, rate_n] ]
where `time_n` is represented as `float`, and `rate_n` as `int`.
The `time` parameter is expected to be a single tuple, or a list of
tuples, each defining a pair of start and end time which are used to
constrain the resulting time series.
The 'first' is defined, only the first matching event fir the selected
entities is considered viable.
Example::
session.filter(etype='task').rate(state=[rp.AGENT_EXECUTING])
'''
timestamps = self.timestamps(event=event, state=state, time=time,
first=first)
if not timestamps:
# nothing to do
return []
times = list()
if sampling:
# get min and max timestamp, and create sampling points at
# regular intervals
r_min = timestamps[0]
r_max = timestamps[-1]
t = r_min
while t < r_max:
times.append(t)
t += sampling
times.append(r_max)
else:
# we create an entry at all timestamps
times = timestamps[:]
if not times:
# nothing to do
return []
# we need to make sure that no two consecutive timestamps are the same,
# as that would lead to a division by zero later on
times = sorted(set(times))
# import pprint
# pprint.pprint(times)
# make sure we start in correct state, and first data point does not
# occur before sampling starts
assert timestamps[0] >= times[0]
# we have the time sequence, now compute event rate at those points
ts_idx = 0 # index into the list of timestamps
ts_len = len(timestamps) # number of timestamps
ret = list() # our rate time series
t_start = times[0] # current samplint window
t_stop = times[0]
for t in times[1:]:
t_start = t_stop # slide sampling window to next sample time
t_stop = t
cnt = 0 # reset event counter
while ts_idx < ts_len:
if timestamps[ts_idx] <= t_stop:
# timestamp is in range, count event
cnt += 1
else:
# we need to slide the sampling window
break
# go to next timestamp
ts_idx += 1
# print 'window: %f - %f (%f) : %5d' % \
# (t_start, t_stop, t_stop - t_start, cnt)
# sampling window completed - store rate of events in sample
ret.append([t_stop, cnt / (t_stop - t_start)])
return ret
# --------------------------------------------------------------------------
#
def utilization(self, metrics, rtype='cpu', udurations=None):
if self._stype != 'radical.pilot':
raise ValueError('session utilization is only available on '
'radical.pilot sessions')
import radical.pilot as rp
provided = rp.utils.get_provided_resources(self, rtype)
consumed = rp.utils.get_consumed_resources(self, rtype, udurations)
stats_abs = {'total': 0.0}
stats_rel = {'total': 100.0}
total = 0.0
for pid in provided['total']:
for box in provided['total'][pid]:
stats_abs['total'] += (box[1] - box[0]) * \
(box[3] - box[2] + 1)
total = stats_abs['total']
for metric in metrics:
if isinstance(metric, list):
name = metric[0]
parts = metric[1]
else:
name = metric
parts = [metric]
if name not in stats_abs:
stats_abs[name] = 0.0
for part in parts:
for uid in consumed[part]:
for box in consumed[part][uid]:
stats_abs[name] += (box[1] - box[0]) * \
(box[3] - box[2] + 1)
info = ''
info += '%s [%d]\n' % (self.uid, len(self.get(etype='task')))
for metric in metrics + ['total']:
if isinstance(metric, list):
name = metric[0]
parts = metric[1]
else:
name = metric
parts = ''
val = stats_abs[name]
if val == 0.0: glyph = '!'
else : glyph = ''
rel = 100.0 * val / total
stats_rel[name] = rel
info += ' %-20s: %14.3f %8.3f%% %2s %s\n' \
% (name, val, rel, glyph, parts)
have = 0.0
over = 0.0
work = 0.0
for metric in sorted(stats_abs.keys()):
if metric == 'total':
have += stats_abs[metric]
else:
if metric == 'Execution Cmd':
work += stats_abs[metric]
else:
over += stats_abs[metric]
miss = have - over - work
rel_over = 100.0 * over / total
rel_work = 100.0 * work / total
rel_miss = 100.0 * miss / total
stats_abs['Other'] = miss
stats_rel['Other'] = rel_miss
info += '\n'
info += ' %-20s: %14.3f %8.3f%%\n' % ('total', have, 100.0)
info += ' %-20s: %14.3f %8.3f%%\n' % ('over', over, rel_over)
info += ' %-20s: %14.3f %8.3f%%\n' % ('work', work, rel_work)
info += ' %-20s: %14.3f %8.3f%%\n' % ('miss', miss, rel_miss)
return provided, consumed, stats_abs, stats_rel, info
# --------------------------------------------------------------------------
#
# def utilization_bak(self, owner, consumer, resource, owner_events=None,
# consumer_events=None):
# '''
# Parms:
# owner : The entity name of the owner of the resources
# consumer : The ename of the entity that consumes the resources
# owned by owner
# resource : The type of resources whose utilization is requested,
# eg. Cores, Memory, GPUS etc.
# owner_events : A list of owner's/owners' events that will be used as
# starting and ending points for the resource ownership.
# consumer_events: A list of owner's/owners' events that will be used as
# starting and ending points for resource consumption.
# Based on these parameters the resources of the owners are collected, as
# well as, the times when the consumer(s) used those resources.
# Returned is a dictionary of the form::
# { 'owner_0': {'range' : owner_range,
# 'resources' : resource_size,
# 'utilization': [[time_0, resource_utilization_0],
# [time_1, resource_utilization_1],
# ...
# [time_n, resource_utilization_n]]},
# 'owner_1': {'range' : owner_range,
# 'resources' : resource_size,
# 'utilization': [[time_0, resource_utilization_0],
# [time_1, resource_utilization_1],
# ...
# [time_n, resource_utilization_n]]},
# ...
# 'owner_n': {'range' : owner_range,
# 'resources' : resource_size,
# 'utilization': [[time_0, resource_utilization_0],
# [time_1, resource_utilization_1],
# ...
# [time_n, resource_utilization_n]]}
# where `time_n` is represented as `float`, `resource_utilization_n` as
# `int`, and resource_size is the total resources the owner has.
# Example::
# s.utilization(owner = 'pilot',
# consumer = 'task',
# resource = 'cores',
# owner_events = [{ru.EVENT: 'bootstrap_0_start'},
# {ru.EVENT: 'bootstrap_0_stop' }])
# consumer_events= [{ru.EVENT: 'exec_start'},
# {ru.EVENT: 'exec_stop' }])
# '''
# ret = dict()
# # Filter the session to get a session of the owners. If that is empty
# # return an empty dict
# relations = self .describe('relations', [owner, consumer])
# if not relations:
# return dict()
# owners = self.filter(etype=owner, inplace=False)
# if not owners:
# return dict()
# # Filter the self to get the consumers. If none are found, return an
# # empty dict.
# #
# # FIXME: this should return an dict with zero utilization over the full
# # time range the resource exist.
# #
# for o in owners.get():
# owner_id = o.uid
# owner_resources = o.description.get(resource)
# owner_range = o.ranges(event=owner_events)
# consumers = self.filter(etype=consumer, uid=relations[owner_id],
# inplace=False)
# if not consumers:
# util = [0]
# else:
# # Go through the consumer entities and create two dictionaries.
# # The first keeps track of how many resources each consumer
# # consumes, and the second has the ranges based on the events.
# consumer_resources = dict()
# consumer_ranges = dict()
# for c in consumers.get():
# ranges = c.ranges(event=consumer_events)
# cons_id = c.uid
# resources_acquired = 0
# if resource == 'cores':
# cores = c.description['cpu_processes'] * \
# c.description['cpu_threads']
# resources_acquired += cores
# elif resource == 'gpus':
# gpus = c.description['gpu_processes']
# resources_acquired += len(gpus)
# else:
# raise ValueError('unsupported utilization resource')
# consumer_resources[cons_id] = resources_acquired
# # Update consumer_ranges if there is at least one range
# if ranges:
# consumer_ranges.update({cons_id: ranges})
# # Sort consumer_ranges based on their values. This command
# # returns a dictionary, which is sorted based on the first value
# # of each entry. In the end the key, are out of order but the
# # values are.
# consumer_ranges = sorted(iter(list(consumer_ranges.items())),
# key=lambda k_v: (k_v[1][0],k_v[0]))
# # Create a timeseries that contains all moments in consumer
# # ranges and sort. This way we have a list that has time any
# # change has happened.
# times = list()
# for cons_id,ranges in consumer_ranges:
# for r in ranges:
# times.append(r[0])
# times.append(r[1])
# times.sort()
# # we have the time sequence, now compute utilization
# # at those points
# util = list()
# for t in times:
# cnt = 0
# for cons_id,ranges in consumer_ranges:
# for r in ranges:
# if t >= r[0] and t <= r[1]:
# cnt += consumer_resources[cons_id]
# util.append([t, cnt])
# ret[owner_id] = {'range' : owner_range,
# 'resources' : owner_resources,
# 'utilization': util}
# return ret
# --------------------------------------------------------------------------
#
[docs] def consistency(self, mode=None):
'''
Performs a number of data consistency checks, and returns a set of UIDs
for entities which have been found to be inconsistent. The method
accepts a single parameter `mode` which can be a list of strings
defining what consistency checks are to be performed. Valid strings are:
- state_model: check if all entity states are in adherence to the
respective entity state model
- event_model: check if all entity events are in adherence to the
respective entity event model
- timestamps: check if events and states are recorded with correct
ordering in time.
If not specified, the method will execute all three checks.
After this method has been run, each checked entity will have more
detailed consistency information available via::
entity.consistency['state_model'] (bool)
entity.consistency['event_model'] (bool)
entity.consistency['timestamps' ] (bool)
entity.consistency['log' ] (list of strings)
The boolean values each indicate consistency of the respective test, the
`log` will contain human readable information about specific consistency
violations.
'''
# FIXME: we could move the method to the entity, so that we can check
# consistency for each entity individually.
self._rep.header('running consistency checks')
ret = list()
MODES = ['state_model', 'event_model', 'timestamps']
if not mode:
mode = MODES
if not isinstance(mode, list):
mode = [mode]
for m in mode:
if m not in MODES:
raise ValueError('unknown consistency mode %s' % m)
if 'state_model' in mode:
ret.extend(self._consistency_state_model())
return list(set(ret)) # make list unique
# --------------------------------------------------------------------------
#
[docs] def usage(self, alloc_entity, alloc_events,
block_entity, block_events,
use_entity, use_events):
'''
This method creates a dict with three entries: `alloc`, `block`, `use`.
Those three dict entries in turn have a a dict of entity IDs for all
entities which have blocks in the respective category, and foreach of
those entity IDs the dict values will be a list of rectangles.
A resource is considered:
- `alloc` (allocated) when it is owned by the RCT application;
- `block` (blocked) when it is reserveed for a specific task;
- `use` (used) when it is utilized by that task.
Each of the rectangles represents a continuous block of resources which
is alloced/blocked/used:
- x_0 time when `alloc/block/usage` begins;
- x_1 time when `alloc/block/usage` ends;
- y_0 lowest index of a continuous block of resource IDs;
- y_1 highest index of a continuous block of resource IDs.
Any specific entity (pilot, task) can have a **set** of such resource
blocks, for example, a task might be placed over multiple,
non-consecutive nodes:
- gpu and cpu resources are rendered as separate blocks (rectangles).
Args:
alloc_entity (Entity): :class:`Entity` instance which allocates
resources
alloc_events (list): event tuples which specify allocation time
block_entity (Entity): :class:`Entity` instance which blocks
resources
block_events (list): event tuples which specify blocking time
use_entity (Entity): :class:`Entity` instance which uses resources
use_events (list): event tuples which specify usage time
Example::
usage('pilot', [{ru.STATE: None, ru.EVENT: 'bootstrap_0_start'},
{ru.STATE: None, ru.EVENT: 'bootstrap_0_stop' }],
'task' , [{ru.STATE: None, ru.EVENT: 'schedule_ok' },
{ru.STATE: None, ru.EVENT: 'unschedule_stop' }],
'task' , [{ru.STATE: None, ru.EVENT: 'exec_start' },
{ru.STATE: None, ru.EVENT: 'exec_stop' }])
'''
# this is currently only supported for RP sessions, as we only know for
# pilots and tasks how to dig resource information out of session and
# entity metadata.
assert self.stype == 'radical.pilot', \
'stype %s unsupported' % self._stype
# for RP sessions, create resource indices which can be used to
# determine the y-axis values for the rectangles. This is basically
# a dict of node_names for each alloc_entity which points to two indexes
# for each node: one starting index for GPUs, and one for CPU cores
res_idx = dict()
idx = 0
for ae in self.get(etype=alloc_entity):
uid = ae.uid
nodes = ae.cfg['resource_details']['rm_info']['node_list']
cpn = ae.cfg['resource_details']['rm_info']['cores_per_node']
gpn = ae.cfg['resource_details']['rm_info']['gpus_per_node']
if ae.uid not in res_idx:
res_idx[ae.uid] = dict()
res_idx[ae.uid]['_min'] = idx
for n in nodes:
res_idx[ae.uid][n[1]] = [[idx , idx + cpn - 1],
[idx + cpn, idx + cpn + gpn - 1]]
idx += cpn
idx += gpn
res_idx[ae.uid]['_max'] = idx - 1
# ----------------------------------------------------------------------
# RP specific helper method to convert given entity information into
# a set of y-value ranges. This returns a tuple of lists where each
# list contains tuples of resource indexes (y-values)
def get_res_idx(entity):
if entity.etype == 'pilot':
# print '----'
# print entity.uid
# import pprint
# pprint.pprint(res_idx[entity.uid])
# we assume min/max to cover CPU and GPU
return [[[res_idx[entity.uid]['_min'],
res_idx[entity.uid]['_max']]], []]
elif entity.etype == 'task':
# find owning pilot
pid = entity.cfg.get('pilot')
if not pid:
# no resources used
return [[],[]]
cpu_idx = list()
gpu_idx = list()
for slot in entity.cfg['slots']['nodes']:
node_id = slot['uid']
for cslot in slot['core_map']:
for c in cslot:
cpu_idx.append(res_idx[pid][node_id][0][0] + c)
for gslot in slot['gpu_map']:
for g in gslot:
gpu_idx.append(res_idx[pid][node_id][1][0] + g)
# identify continuous groups of y-values and return
return [
[list(g) for g in mit.consecutive_groups(cpu_idx)],
[list(g) for g in mit.consecutive_groups(gpu_idx)]
]
# ----------------------------------------------------------------------
ret = {'alloc': dict(),
'block': dict(),
'use' : dict()}
etypes = {'alloc': {'etype' : alloc_entity,
'events': alloc_events},
'block': {'etype' : block_entity,
'events': block_events},
'use' : {'etype' : use_entity,
'events': use_events}}
for entity in self.get():
for mode in ['alloc', 'block', 'use']:
if etypes[mode]['etype'] == entity.etype:
event_list = etypes[mode]['events']
if not event_list:
continue
if not isinstance(event_list[0], list):
event_list = [event_list]
uid = entity.uid
if uid not in ret[mode]:
ret[mode][uid] = list()
for events in event_list:
assert len(events) == 2
y_values = get_res_idx(entity)
t_values = entity.ranges(event=events)
# print
# print uid
# print events
#
# import pprint
# pprint.pprint(t_values)
# pprint.pprint(y_values)
for t_range in t_values:
for y_range in y_values[0]:
ret[mode][uid].append([t_range[0], t_range[-1],
y_range[0], y_range[-1]])
for y_range in y_values[1]:
ret[mode][uid].append([t_range[0], t_range[-1],
y_range[0], y_range[-1]])
return ret
# --------------------------------------------------------------------------
#
def _consistency_state_model(self):
ret = list() # list of inconsistent entity IDs
for et in self.list('etype'):
self._rep.info('%s state model\n' % et)
# sm = self.describe('state_model', etype=et)
sv = self.describe('state_values', etype=et)[et]['state_values']
for e in self.get(etype=et):
es = e.states
if not sv:
if es:
self._rep.warn(' %-30s : %s' % (et, list(es.keys())))
e._consistency['state_model'] = None
continue
self._rep.info(' %-30s :' % e.uid)
missing = False # did we miss any state so far?
final_v = sorted(sv.keys())[-1]
final_s = sv[final_v]
if not isinstance(final_s, list):
final_s = [final_s]
sm_ok = True
sm_log = list()
miss_log = list()
for s in sv.values():
if not s:
continue
if not isinstance(s, list):
s = [s]
# check if we have that state
found = None
for _s in s:
if _s in es:
found = _s
break
if found:
if missing:
if found not in final_s:
# found a state after a previous one was missing,
# but we are not final. Oops
self._rep.warn('+')
sm_log.extend(miss_log)
miss_log = list()
sm_ok = False
continue
else:
if s == final_s:
# no final state? Oops
self._rep.error('no final state! ')
sm_ok = False
sm_log.append('missing final state')
continue
else:
# Hmm, might be ok. Lets see...
missing = True
self._rep.warn('*')
miss_log.append('missing state(s) %s' % s)
continue
self._rep.ok('+')
e._consistency['state_model'] = sm_ok
e._consistency['log'].extend(sm_log)
if not sm_ok:
ret.append(e.uid)
self._rep.plain('\n')
return ret
# --------------------------------------------------------------------------
#
[docs] def tzero(self, t):
'''
Setting a `tzero` timestamp will shift all timestamps for all entities
in this session by that amount. This simplifies the alignment of
multiple sessions, or the focus on specific events.
'''
old_tzero = self._tzero
self._tzero = t
for entity in list(self._entities.values()):
# entity.states are shallow copies of the events
for event in entity.events:
event[ru.TIME] += old_tzero
event[ru.TIME] -= self._tzero
# ------------------------------------------------------------------------------