import sys
import pprint
import radical.utils as ru
# ------------------------------------------------------------------------------
#
[docs]class Entity(object):
[docs] def __init__(self, _uid, _profile, _details):
"""
Args:
uid (`str`): an ID assumed to be unique in the scope of an RA
Session
profile: a list of profile events for this entity
details: a dictionary of complementary information on this entity
"""
assert _uid
assert _profile
self._uid = _uid
self._details = _details
self._etype = self._details.get('etype')
self._description = self._details.get('description', dict())
self._cfg = self._details.get('cfg', dict())
self._resources = self._details.get('resources', dict())
# if have no etype tree information, guess the etype from uid
if not self._etype:
self._etype = self._uid.split('.')[0]
# FIXME: this should be sorted out on RP level
self._cfg['hostid'] = self._details.get('hostid')
# entities for which we have no tree information are raptor tasks (they
# were created by the master and never saw the client side)
# FIXME: this should be sorted out on RP level
if not self._etype:
if not self._details and 'task' in self._uid:
self._etype = 'raptor.task'
else:
self._etype = 'unknown'
self._states = dict()
self._events = list()
self._consistency = {'log' : list(),
'state_model' : None,
'event_model' : None,
'timestamps' : None}
self._t_start = None
self._t_stop = None
self._ttc = None
self._initialize(_profile)
# --------------------------------------------------------------------------
#
def __getstate__(self):
state = {
'uid' : self._uid,
'etype' : self._etype,
'details' : self._details,
'description' : self._description,
'resources' : self._resources,
'cfg' : self._cfg,
'states' : self._states,
'events' : self._events,
'consistency' : self._consistency,
't_start' : self._t_start,
't_stop' : self._t_stop,
'ttc' : self._ttc,
}
return state
# --------------------------------------------------------------------------
#
def __setstate__(self, state):
self._uid = state['uid']
self._etype = state['etype']
self._details = state['details']
self._description = state['description']
self._resources = state['resources']
self._cfg = state['cfg']
self._states = state['states']
self._events = state['events']
self._consistency = state['consistency']
self._t_start = state['t_start']
self._t_stop = state['t_stop']
self._ttc = state['ttc']
# --------------------------------------------------------------------------
#
@property
def t_start(self):
return self._t_start
@property
def t_stop(self):
return self._t_stop
@property
def ttc(self):
return self._ttc
@property
def t_range(self):
return [self._t_start, self._t_stop]
@property
def uid(self):
return self._uid
@property
def etype(self):
return self._etype
@property
def states(self):
return self._states
@property
def description(self):
return self._description
@property
def resources(self):
return self._resources
@property
def cfg(self):
return self._cfg
@property
def events(self):
return self._events
@property
def consistency(self):
return self._consistency
# --------------------------------------------------------------------------
#
def __str__(self):
return "ra.Entity [%s]: %s\n states: %s" \
% (self.etype, self.uid, list(self._states.keys()))
# --------------------------------------------------------------------------
#
def __repr__(self):
return str(self)
# --------------------------------------------------------------------------
#
def _initialize(self, profile):
# only call once
assert not self._states
assert not self._events
if profile:
self._t_start = sys.float_info.max
self._t_stop = sys.float_info.min
# we expect each event tuple to have `time` and `event`, and expect
# 'advance' events to signify a state transition.
for event in sorted(profile, key=lambda x: (x[ru.TIME])):
t = event[ru.TIME]
self._t_start = min(self._t_start, t)
self._t_stop = max(self._t_stop, t)
# FIXME: this should be phased out
if event[ru.EVENT] in 'advance':
event[ru.EVENT] = 'state'
if event[ru.EVENT] == 'state':
state = event[ru.STATE]
self._states[state] = event
# we also treat state transitions as generic event.
# Because, why not?
self._events.append(event)
if profile:
self._ttc = self._t_stop - self._t_start
# FIXME: assert state model adherence here (if state model is defined)
# --------------------------------------------------------------------------
#
def _ensure_tuplelist(self, events):
if not events:
return []
ret = list()
if not isinstance(events, list):
events = [events]
for e in events:
if isinstance(e,dict):
et = ru.PROF_KEY_MAX * [None]
for k,v in list(e.items()):
et[k] = v
ret.append(tuple(et))
else:
ret.append(e)
return ret
# --------------------------------------------------------------------------
#
def as_dict(self):
return {
'uid' : self._uid,
'etype' : self._etype,
'states' : self._states,
'events' : self._events,
'cfg' : self._cfg,
'resources' : self._resources,
'description': self._description,
}
# --------------------------------------------------------------------------
#
def dump(self):
pprint.pprint(self.as_dict())
# --------------------------------------------------------------------------
#
def list_states(self):
return list(self._states.keys())
# --------------------------------------------------------------------------
#
[docs] def duration(self, state=None, event=None, time=None, ranges=None):
"""
This method accepts a set of initial and final conditions, interprets
them as documented in the `ranges()` method (which has the same
signature), and then returns the difference between the resulting
timestamps.
"""
if not ranges:
ranges = self.ranges(state, event, time)
# print 'get %5d ranges for %s' % (len(ranges), self.uid)
# pprint.pprint(self.events)
else:
assert not state
assert not event
assert not time
# make sure the ranges are collapsed (although they likely are
# already...)
# print 'use %5d ranges for %s' % (len(ranges), self.uid)
ranges = ru.collapse_ranges(ranges)
if not ranges:
raise ValueError('no duration defined for given constraints '
'(%s) (%s) (%s) (%s)' % (state, event, time, ranges))
return sum(r[1] - r[0] for r in ranges)
# --------------------------------------------------------------------------
#
[docs] def timestamps(self, state=None, event=None, time=None):
"""
This method accepts a set of conditions, and returns the list of
timestamps for which those conditions applied, i.e., for which state
transitions or events are known which match the given 'state' or 'event'
parameter. If no match is found, an empty list is returned.
Both `state` and `event` can be lists, in which case the union of all
timestamps are returned.
The `time` parameter is expected to be a single tuple, or a list of
tuples, each defining a pair of start and end time which are used to
constrain the matching timestamps.
The returned list will be sorted.
"""
event = self._ensure_tuplelist(event)
state = ru.as_list(state)
ret = list()
if not event and not state:
# no filters, consider all events
ret = self._events
for e in event:
for x in self._events:
if self._match_event(e,x):
ret.append(x[ru.TIME])
for s in state:
if s in self._states:
ret.append(self._states[s][ru.TIME])
# apply time filters
if time:
matched = list()
for etime in ret:
for ttuple in time:
if etime >= ttuple[0] and etime <= etime[1]:
matched.append(etime)
break
ret = matched
return sorted(ret)
# --------------------------------------------------------------------------
#
def _match_event(self, needle, hay):
for key in range(ru.PROF_KEY_MAX - 1):
if needle[key] is not None:
if key == ru.MSG:
if needle[key] not in hay[key]:
return False
else:
if needle[key] != hay[key]:
return False
return True
# --------------------------------------------------------------------------
#
[docs] def ranges(self, state=None, event=None, time=None,
expand=False, collapse=True):
"""
This method accepts a set of initial and final conditions, in the form
of range of state and or event specifiers::
entity.ranges(state=[['INITIAL_STATE_1', 'INITIAL_STATE_2'],
'FINAL_STATE_1', 'FINAL_STATE_2' ]],
event=[[ initial_event_1, initial_event_2 ]
[ final_event_1, final_event_2 ]],
time =[[2.0, 2.5], [3.0, 3.5]])
More specifically, the `state` and `event` parameter are expected to be
a tuple, where the first element defines the initial condition, and the
second element defines the final condition. The `time` parameter is
expected to be a single tuple, or a list of tuples, each defining a pair
of start and end time which are used to constrain the resulting ranges.
States are expected as strings, events as full event tuples::
[ru.TIME, ru.NAME, ru.UID, ru.STATE, ru.EVENT, ru.MSG, ru.ENTITY]
where empty fields are not applied in the filtering - all other fields
must match exactly. The events can also be specified as dictionaries,
which then don't need to have all fields set.
The method will:
- determine the *earliest* timestamp when any of the given initial
conditions have been met, which can be either an event or a state;
- determine the *next* timestamp when any of the given final
conditions have been met (when `expand` is set to `False` [default])
OR
- determine the *last* timestamp when any of the given final
conditions have been met (when `expand` is set to `True`)
From that final point in time the search for the next initial condition
applies again, which may result in another time range to be found. The
method returns the set of found ranges, as a list of `[start, end]` time
tuples.
The resulting ranges are constrained by the `time` constraints, if such
are given.
Note that with `expand=True`, at most one range will be found.
Setting 'collapse' to 'True' (default) will prompt the method to
collapse the resulting set of ranges.
The returned ranges are time-sorted
Example::
task.ranges(state=[rp.NEW, rp.FINAL]))
task.ranges(event=[{ru.NAME : 'exec_start'},
{ru.NAME : 'exec_ok'}])
"""
# NOTE: this method relies on all state changes (as events in
# `self.states`) to also be recorded as events (as events in in
# `self.events` with `ru.NAME == 'state'`).
if not state and not event:
raise ValueError('duration needs state and/or event arguments')
event = self._ensure_tuplelist(event)
if not state: state = [[], []]
if not event: event = [[], []]
s_init = state[0]
s_final = state[1]
e_init = event[0]
e_final = event[1]
if not isinstance(s_init, list): s_init = [s_init ]
if not isinstance(s_final, list): s_final = [s_final]
if not isinstance(e_init, list): e_init = [e_init ]
if not isinstance(e_final, list): e_final = [e_final]
conds_init = list()
conds_final = list()
for s in s_init:
et = ru.PROF_KEY_MAX * [None]
et[ru.STATE] = s
et[ru.EVENT] = 'state'
conds_init.append(tuple(et))
for s in s_final:
et = ru.PROF_KEY_MAX * [None]
et[ru.STATE] = s
et[ru.EVENT] = 'state'
conds_final.append(tuple(et))
for e in e_init:
if isinstance(e,dict):
et = ru.PROF_KEY_MAX * [None]
for k,v in list(e.items()):
et[k] = v
conds_init.append(tuple(et))
else:
conds_init.append(e)
# t_start = sys.float_info.max
# for e in e_init:
# for e_info in self._events:
# if self._match_event(e, e_info):
# t_start = min(t_start, e_info[ru.TIME])
#
# t_stop = sys.float_info.min
# for e in e_final:
# for e_info in self._events:
# if self._match_event(e, e_info):
# t_stop = max(t_stop, e_info[ru.TIME])
#
# if t_start == sys.float_info.max:
# # return []
# raise ValueError('initial condition did not apply')
#
# if t_stop == sys.float_info.min:
# # return []
# raise ValueError('final condition did not apply')
#
# if t_stop < t_start:
# # return []
# raise ValueError('duration uncovered time inconsistency')
for e in e_final:
if isinstance(e,dict):
et = ru.PROF_KEY_MAX * [None]
for k,v in list(e.items()):
et[k] = v
conds_final.append(tuple(et))
else:
conds_final.append(e)
ranges = list()
this_range = [None, None]
# NOTE: this assumes that `self.events` are time sorted
for e in self._events:
if this_range[0] is None:
# check for an initial event.
for c in conds_init:
if self._match_event(c, e):
this_range[0] = e[ru.TIME]
break
else:
# check for a final event. If found and '!expand`, then store
# the now completed event away, and start a new one; if
# `expand`, then keep searching for a later final event
for c in conds_final:
if self._match_event(c, e):
this_range[1] = e[ru.TIME]
if not expand:
ranges.append(this_range)
this_range = [None, None]
break
# we went through all events. `this_range` may or may not be a usable
# range here. If it is, append it to ranges.
if this_range[0] is not None and \
this_range[1] is not None :
ranges.append(this_range)
# apply time filter, if specified
# For all ranges, check if they fall completely or partially within any
# of the given time filters. If not, drop that range, if yes, include
# the overlapping part.
if not time or not len(time):
ret = ranges
else:
ret = list()
if not isinstance(time[0], list):
time = [time]
# for each range in ret, we make sure that it does not violate any
# time filter
for erange in ranges:
for trange in time:
new_start = max(trange[0], erange[0])
new_stop = min(trange[1], erange[1])
if new_stop > new_start:
ret.append([new_start, new_stop])
if collapse:
ret = ru.collapse_ranges(ret)
return sorted(ret)
# ------------------------------------------------------------------------------