Inspection¶
RADICAL-Analytics enables deriving information about RCT sessions, pilots and tasks. For example, session ID, number of tasks, number of pilots, final state of the tasks and pilots, CPU/GPU processes for each task, etc. That information allows to derive task requirements and resource capabilities, alongside the RCT configuration parameters used for a session.
Prologue¶
Load the Python modules needed to profile and plot a RADICAL-Cybertool (RCT) session.
[1]:
import os
import tarfile
import pandas as pd
import matplotlib as mpl
import matplotlib.pyplot as plt
import matplotlib.ticker as mticker
import radical.utils as ru
import radical.pilot as rp
import radical.entk as re
import radical.analytics as ra
Load the RADICAL Matplotlib style to obtain viasually consistent and publishable-qality plots.
[2]:
plt.style.use(ra.get_mplstyle('radical_mpl'))
Usually, it is useful to record the stack used for the analysis.
[3]:
! radical-stack
python : /home/docs/checkouts/readthedocs.org/user_builds/radicalanalytics/envs/stable/bin/python3
pythonpath :
version : 3.9.15
virtualenv :
radical.analytics : 1.34.0-v1.34.0@HEAD-detached-at-0b58be0
radical.entk : 1.33.0
radical.gtod : 1.20.1
radical.pilot : 1.34.0
radical.saga : 1.34.0
radical.utils : 1.33.0
Single Session¶
Name and location of the session we profile.
[4]:
sidsbz2 = !find sessions -maxdepth 1 -type f -exec basename {} \;
sids = [s[:-8] for s in sidsbz2]
sdir = 'sessions/'
Unbzip and untar the session.
[5]:
sidbz2 = sidsbz2[0]
sid = sidbz2[:-8]
sp = sdir + sidbz2
tar = tarfile.open(sp, mode='r:bz2')
tar.extractall(path=sdir)
tar.close()
Create a ra.Session
object for the session. We do not need EnTK-specific traces so load only the RP traces contained in the EnTK session. Thus, we pass the 'radical.pilot'
session type to ra.Session
.
ra.Session
in capt
to avoid polluting the notebook with warning messages.[6]:
%%capture capt
sp = sdir + sid
session = ra.Session(sp, 'radical.pilot')
pilots = session.filter(etype='pilot', inplace=False)
tasks = session.filter(etype='task' , inplace=False)
Information about session that is commonly used when analyzing and plotting one or more RCT sessions.
[7]:
# Session info
sinfo = {
'sid' : session.uid,
'hostid' : session.get(etype='pilot')[0].cfg['hostid'],
'cores_node': session.get(etype='pilot')[0].cfg['resource_details']['rm_info']['cores_per_node'],
'gpus_node' : session.get(etype='pilot')[0].cfg['resource_details']['rm_info']['gpus_per_node'],
'smt' : session.get(etype='pilot')[0].cfg['resource_details']['rm_info']['threads_per_core']
}
# Pilot info (assumes 1 pilot)
sinfo.update({
'pid' : pilots.list('uid'),
'npilot' : len(pilots.get()),
'npact' : len(pilots.timestamps(state='PMGR_ACTIVE')),
})
# Task info
sinfo.update({
'ntask' : len(tasks.get()),
'ntdone' : len(tasks.timestamps(state='DONE')),
'ntcanceled': len(tasks.timestamps(state='CANCELED')),
'ntfailed' : len(tasks.timestamps(state='FAILED')),
})
# Derive info (assume a single pilot)
sinfo.update({
'pres' : pilots.get(uid=sinfo['pid'])[0].description['resource'],
'ncores' : pilots.get(uid=sinfo['pid'])[0].description['cores'],
'ngpus' : pilots.get(uid=sinfo['pid'])[0].description['gpus']
})
sinfo.update({
'nnodes' : int(sinfo['ncores']/sinfo['cores_node'])
})
sinfo
[7]:
{'sid': 'rp.session.mosto.mturilli.019432.0003',
'hostid': 'mosto',
'cores_node': 64,
'gpus_node': 8,
'smt': 1,
'pid': ['pilot.0000'],
'npilot': 1,
'npact': 1,
'ntask': 2048,
'ntdone': 2048,
'ntcanceled': 0,
'ntfailed': 0,
'pres': 'local.localhost',
'ncores': 512,
'ngpus': 64,
'nnodes': 8}
Information about tasks that is commonly used when analyzing and plotting one or more RCT sessions.
ra.entity.description
to get each task description as a dictionary. We then select the keys of that dictionary that contain the task requirements. More keys are available, especially those about staged input/output files.[8]:
tinfo = []
for task in tasks.get():
treq = {
'executable' : task.description['executable'],
'cpu_process_type' : task.description['cpu_process_type'],
'cpu_processes' : task.description['cpu_processes'],
'cpu_thread_type' : task.description['cpu_thread_type'],
'cpu_threads' : task.description['cpu_threads'],
'gpu_process_type' : task.description['gpu_process_type'],
'gpu_processes' : task.description['gpu_processes'],
'gpu_thread_type' : task.description['gpu_thread_type'],
'gpu_threads' : task.description['gpu_threads']
}
if not tinfo:
treq['n_of_tasks'] = 1
tinfo.append(treq)
continue
for i, ti in enumerate(tinfo):
counter = ti['n_of_tasks']
ti.pop('n_of_tasks')
if ti == treq:
counter += 1
tinfo[i]['n_of_tasks'] = counter
else:
treq['n_of_tasks'] = 1
tinfo.append(treq)
tinfo
[8]:
[{'executable': '/home/mturilli/github/radical.analytics/docs/source/bin/radical-pilot-hello.sh',
'cpu_process_type': '',
'cpu_processes': 0,
'cpu_thread_type': '',
'cpu_threads': 0,
'gpu_process_type': '',
'gpu_processes': 0,
'gpu_thread_type': '',
'gpu_threads': 0,
'n_of_tasks': 2048}]
Multiple Sessions¶
Unbzip and untar those sessions.
[9]:
for sid in sids:
sp = sdir + sid + '.tar.bz2'
tar = tarfile.open(sp, mode='r:bz2')
tar.extractall(path=sdir)
tar.close()
Create the session, tasks and pilots objects for each session.
[10]:
%%capture capt
ss = {}
for sid in sids:
sp = sdir + sid
ss[sid] = {'s': ra.Session(sp, 'radical.pilot')}
ss[sid].update({'p': ss[sid]['s'].filter(etype='pilot', inplace=False),
't': ss[sid]['s'].filter(etype='task' , inplace=False)})
[11]:
for sid in sids:
ss[sid].update({'sid' : ss[sid]['s'].uid,
'hostid' : ss[sid]['s'].get(etype='pilot')[0].cfg['hostid'],
'cores_node': ss[sid]['s'].get(etype='pilot')[0].cfg['resource_details']['rm_info']['cores_per_node'],
'gpus_node' : ss[sid]['s'].get(etype='pilot')[0].cfg['resource_details']['rm_info']['gpus_per_node'],
'smt' : ss[sid]['s'].get(etype='pilot')[0].cfg['resource_details']['rm_info']['threads_per_core']
})
ss[sid].update({
'pid' : ss[sid]['p'].list('uid'),
'npilot' : len(ss[sid]['p'].get()),
'npact' : len(ss[sid]['p'].timestamps(state='PMGR_ACTIVE'))
})
ss[sid].update({
'ntask' : len(ss[sid]['t'].get()),
'ntdone' : len(ss[sid]['t'].timestamps(state='DONE')),
'ntfailed' : len(ss[sid]['t'].timestamps(state='FAILED')),
'ntcanceled': len(ss[sid]['t'].timestamps(state='CANCLED'))
})
ss[sid].update({'pres' : ss[sid]['p'].get(uid=ss[sid]['pid'])[0].description['resource'],
'ncores' : ss[sid]['p'].get(uid=ss[sid]['pid'])[0].description['cores'],
'ngpus' : ss[sid]['p'].get(uid=ss[sid]['pid'])[0].description['gpus']
})
ss[sid].update({'nnodes' : int(ss[sid]['ncores']/ss[sid]['cores_node'])})
For presentation purposes, we can convert the session information into a DataFrame and rename some of the columns to improve readability.
[12]:
ssinfo = []
for sid in sids:
ssinfo.append({'session' : sid,
'resource' : ss[sid]['pres'],
'cores_node': ss[sid]['cores_node'],
'gpus_node' : ss[sid]['gpus_node'],
'pilots' : ss[sid]['npilot'],
'ps_active' : ss[sid]['npact'],
'cores' : int(ss[sid]['ncores']/ss[sid]['smt']),
'gpus' : ss[sid]['ngpus'],
'nodes' : ss[sid]['nnodes'],
'tasks' : ss[sid]['ntask'],
't_done' : ss[sid]['ntdone'],
't_failed' : ss[sid]['ntfailed']})
df_info = pd.DataFrame(ssinfo)
df_info
[12]:
session | resource | cores_node | gpus_node | pilots | ps_active | cores | gpus | nodes | tasks | t_done | t_failed | |
---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | rp.session.mosto.mturilli.019432.0003 | local.localhost | 64 | 8 | 1 | 1 | 512 | 64 | 8 | 2048 | 2048 | 0 |
1 | rp.session.mosto.mturilli.019432.0004 | local.localhost | 64 | 8 | 1 | 1 | 1024 | 128 | 16 | 2048 | 2048 | 0 |
2 | rp.session.mosto.mturilli.019432.0002 | local.localhost | 64 | 8 | 1 | 1 | 256 | 32 | 4 | 2048 | 2048 | 0 |
3 | rp.session.mosto.mturilli.019432.0005 | local.localhost | 64 | 8 | 1 | 1 | 2048 | 256 | 32 | 2048 | 2048 | 0 |
We can then derive task information for each session.
[13]:
tsinfo = {}
for sid in sids:
tsinfo[sid] = []
for task in tasks.get():
treq = {
'executable' : task.description['executable'],
'cpu_process_type' : task.description['cpu_process_type'],
'cpu_processes' : task.description['cpu_processes'],
'cpu_thread_type' : task.description['cpu_thread_type'],
'cpu_threads' : task.description['cpu_threads'],
'gpu_process_type' : task.description['gpu_process_type'],
'gpu_processes' : task.description['gpu_processes'],
'gpu_thread_type' : task.description['gpu_thread_type'],
'gpu_threads' : task.description['gpu_threads']
}
if not tsinfo[sid]:
treq['n_of_tasks'] = 1
tsinfo[sid].append(treq)
continue
for i, ti in enumerate(tsinfo[sid]):
counter = ti['n_of_tasks']
ti.pop('n_of_tasks')
if ti == treq:
counter += 1
tsinfo[sid][i]['n_of_tasks'] = counter
else:
treq['n_of_tasks'] = 1
tsinfo[sid].append(treq)
tsinfo
[13]:
{'rp.session.mosto.mturilli.019432.0003': [{'executable': '/home/mturilli/github/radical.analytics/docs/source/bin/radical-pilot-hello.sh',
'cpu_process_type': '',
'cpu_processes': 0,
'cpu_thread_type': '',
'cpu_threads': 0,
'gpu_process_type': '',
'gpu_processes': 0,
'gpu_thread_type': '',
'gpu_threads': 0,
'n_of_tasks': 2048}],
'rp.session.mosto.mturilli.019432.0004': [{'executable': '/home/mturilli/github/radical.analytics/docs/source/bin/radical-pilot-hello.sh',
'cpu_process_type': '',
'cpu_processes': 0,
'cpu_thread_type': '',
'cpu_threads': 0,
'gpu_process_type': '',
'gpu_processes': 0,
'gpu_thread_type': '',
'gpu_threads': 0,
'n_of_tasks': 2048}],
'rp.session.mosto.mturilli.019432.0002': [{'executable': '/home/mturilli/github/radical.analytics/docs/source/bin/radical-pilot-hello.sh',
'cpu_process_type': '',
'cpu_processes': 0,
'cpu_thread_type': '',
'cpu_threads': 0,
'gpu_process_type': '',
'gpu_processes': 0,
'gpu_thread_type': '',
'gpu_threads': 0,
'n_of_tasks': 2048}],
'rp.session.mosto.mturilli.019432.0005': [{'executable': '/home/mturilli/github/radical.analytics/docs/source/bin/radical-pilot-hello.sh',
'cpu_process_type': '',
'cpu_processes': 0,
'cpu_thread_type': '',
'cpu_threads': 0,
'gpu_process_type': '',
'gpu_processes': 0,
'gpu_thread_type': '',
'gpu_threads': 0,
'n_of_tasks': 2048}]}