Inspection

RADICAL-Analytics enables deriving information about RCT sessions, pilots and tasks. For example, session ID, number of tasks, number of pilots, final state of the tasks and pilots, CPU/GPU processes for each task, etc. That information allows to derive task requirements and resource capabilities, alongside the RCT configuration parameters used for a session.

Prologue

Load the Python modules needed to profile and plot a RADICAL-Cybertool (RCT) session.

[1]:
import os
import tarfile

import pandas as pd
import matplotlib as mpl
import matplotlib.pyplot as plt
import matplotlib.ticker as mticker

import radical.utils as ru
import radical.pilot as rp
import radical.entk as re
import radical.analytics as ra

Load the RADICAL Matplotlib style to obtain viasually consistent and publishable-qality plots.

[2]:
plt.style.use(ra.get_mplstyle('radical_mpl'))

Usually, it is useful to record the stack used for the analysis.

Note: The analysis stack might be different from the stack used to create the session to analyze. Usually, the two stacks must have the same minor release number (Major.Minor.Patch) in order to be compatible.
[3]:
! radical-stack

  python               : /home/docs/checkouts/readthedocs.org/user_builds/radicalanalytics/envs/latest/bin/python3
  pythonpath           :
  version              : 3.9.17
  virtualenv           :

  radical.analytics    : 1.35.0-v1.34.0-4-g213a18f@HEAD-detached-at-origin-devel
  radical.entk         : 1.36.0
  radical.gtod         : 1.20.1
  radical.pilot        : 1.36.0
  radical.saga         : 1.36.0
  radical.utils        : 1.33.0

Single Session

Name and location of the session we profile.

[4]:
sidsbz2 = !find sessions -maxdepth 1 -type f -exec basename {} \;
sids = [s[:-8] for s in sidsbz2]
sdir = 'sessions/'

Unbzip and untar the session.

[5]:
sidbz2 = sidsbz2[0]
sid = sidbz2[:-8]
sp  = sdir + sidbz2

tar = tarfile.open(sp, mode='r:bz2')
tar.extractall(path=sdir)
tar.close()

Create a ra.Session object for the session. We do not need EnTK-specific traces so load only the RP traces contained in the EnTK session. Thus, we pass the 'radical.pilot' session type to ra.Session.

Warning: We already know we need information about pilots and tasks. Thus, we save in memory two session objects filtered for pilots and tasks. This might be too expensive with large sessions, depending on the amount of memory available.
Note: We save the ouput of ra.Session in capt to avoid polluting the notebook with warning messages.
[6]:
%%capture capt

sp = sdir + sid

session = ra.Session(sp, 'radical.pilot')
pilots  = session.filter(etype='pilot', inplace=False)
tasks   = session.filter(etype='task' , inplace=False)

Information about session that is commonly used when analyzing and plotting one or more RCT sessions.

[7]:
# Session info
sinfo = {
    'sid'       : session.uid,
    'hostid'    : session.get(etype='pilot')[0].cfg['hostid'],
    'cores_node': session.get(etype='pilot')[0].cfg['resource_details']['rm_info']['cores_per_node'],
    'gpus_node' : session.get(etype='pilot')[0].cfg['resource_details']['rm_info']['gpus_per_node'],
    'smt'       : session.get(etype='pilot')[0].cfg['resource_details']['rm_info']['threads_per_core']
}

# Pilot info (assumes 1 pilot)
sinfo.update({
    'pid'       : pilots.list('uid'),
    'npilot'    : len(pilots.get()),
    'npact'     : len(pilots.timestamps(state='PMGR_ACTIVE')),
})

# Task info
sinfo.update({
    'ntask'     : len(tasks.get()),
    'ntdone'    : len(tasks.timestamps(state='DONE')),
    'ntcanceled': len(tasks.timestamps(state='CANCELED')),
    'ntfailed'  : len(tasks.timestamps(state='FAILED')),
})

# Derive info (assume a single pilot)
sinfo.update({
    'pres'      : pilots.get(uid=sinfo['pid'])[0].description['resource'],
    'ncores'    : pilots.get(uid=sinfo['pid'])[0].description['cores'],
    'ngpus'     : pilots.get(uid=sinfo['pid'])[0].description['gpus']
})
sinfo.update({
    'nnodes'    : int(sinfo['ncores']/sinfo['cores_node'])
})

sinfo
[7]:
{'sid': 'rp.session.mosto.mturilli.019432.0005',
 'hostid': 'mosto',
 'cores_node': 64,
 'gpus_node': 8,
 'smt': 1,
 'pid': ['pilot.0000'],
 'npilot': 1,
 'npact': 1,
 'ntask': 2048,
 'ntdone': 2048,
 'ntcanceled': 0,
 'ntfailed': 0,
 'pres': 'local.localhost',
 'ncores': 2048,
 'ngpus': 256,
 'nnodes': 32}

Information about tasks that is commonly used when analyzing and plotting one or more RCT sessions.

Note: we use ra.entity.description to get each task description as a dictionary. We then select the keys of that dictionary that contain the task requirements. More keys are available, especially those about staged input/output files.
[8]:
tinfo = []
for task in tasks.get():

    treq = {
        'executable'       : task.description['executable'],
        'cpu_process_type' : task.description['cpu_process_type'],
        'cpu_processes'    : task.description['cpu_processes'],
        'cpu_thread_type'  : task.description['cpu_thread_type'],
        'cpu_threads'      : task.description['cpu_threads'],
        'gpu_process_type' : task.description['gpu_process_type'],
        'gpu_processes'    : task.description['gpu_processes'],
        'gpu_thread_type'  : task.description['gpu_thread_type'],
        'gpu_threads'      : task.description['gpu_threads']
    }

    if not tinfo:
        treq['n_of_tasks'] = 1
        tinfo.append(treq)
        continue

    for i, ti in enumerate(tinfo):
        counter = ti['n_of_tasks']
        ti.pop('n_of_tasks')

        if ti == treq:
            counter += 1
            tinfo[i]['n_of_tasks'] = counter
        else:
            treq['n_of_tasks'] = 1
            tinfo.append(treq)
tinfo
[8]:
[{'executable': '/home/mturilli/github/radical.analytics/docs/source/bin/radical-pilot-hello.sh',
  'cpu_process_type': '',
  'cpu_processes': 0,
  'cpu_thread_type': '',
  'cpu_threads': 0,
  'gpu_process_type': '',
  'gpu_processes': 0,
  'gpu_thread_type': '',
  'gpu_threads': 0,
  'n_of_tasks': 2048}]

Multiple Sessions

Unbzip and untar those sessions.

[9]:
for sid in sids:
    sp = sdir + sid + '.tar.bz2'
    tar = tarfile.open(sp, mode='r:bz2')
    tar.extractall(path=sdir)
    tar.close()

Create the session, tasks and pilots objects for each session.

[10]:
%%capture capt

ss = {}
for sid in sids:
    sp = sdir + sid
    ss[sid] = {'s': ra.Session(sp, 'radical.pilot')}
    ss[sid].update({'p': ss[sid]['s'].filter(etype='pilot', inplace=False),
                    't': ss[sid]['s'].filter(etype='task' , inplace=False)})
[11]:
for sid in sids:
    ss[sid].update({'sid'       : ss[sid]['s'].uid,
                    'hostid'    : ss[sid]['s'].get(etype='pilot')[0].cfg['hostid'],
                    'cores_node': ss[sid]['s'].get(etype='pilot')[0].cfg['resource_details']['rm_info']['cores_per_node'],
                    'gpus_node' : ss[sid]['s'].get(etype='pilot')[0].cfg['resource_details']['rm_info']['gpus_per_node'],
                    'smt'       : ss[sid]['s'].get(etype='pilot')[0].cfg['resource_details']['rm_info']['threads_per_core']
    })

    ss[sid].update({
                    'pid'       : ss[sid]['p'].list('uid'),
                    'npilot'    : len(ss[sid]['p'].get()),
                    'npact'     : len(ss[sid]['p'].timestamps(state='PMGR_ACTIVE'))
    })

    ss[sid].update({
                    'ntask'     : len(ss[sid]['t'].get()),
                    'ntdone'    : len(ss[sid]['t'].timestamps(state='DONE')),
                    'ntfailed'  : len(ss[sid]['t'].timestamps(state='FAILED')),
                    'ntcanceled': len(ss[sid]['t'].timestamps(state='CANCLED'))
    })


    ss[sid].update({'pres'      : ss[sid]['p'].get(uid=ss[sid]['pid'])[0].description['resource'],
                    'ncores'    : ss[sid]['p'].get(uid=ss[sid]['pid'])[0].description['cores'],
                    'ngpus'     : ss[sid]['p'].get(uid=ss[sid]['pid'])[0].description['gpus']
    })

    ss[sid].update({'nnodes'    : int(ss[sid]['ncores']/ss[sid]['cores_node'])})

For presentation purposes, we can convert the session information into a DataFrame and rename some of the columns to improve readability.

[12]:
ssinfo = []
for sid in sids:
    ssinfo.append({'session'   : sid,
                   'resource'  : ss[sid]['pres'],
                   'cores_node': ss[sid]['cores_node'],
                   'gpus_node' : ss[sid]['gpus_node'],
                   'pilots'    : ss[sid]['npilot'],
                   'ps_active' : ss[sid]['npact'],
                   'cores'     : int(ss[sid]['ncores']/ss[sid]['smt']),
                   'gpus'      : ss[sid]['ngpus'],
                   'nodes'     : ss[sid]['nnodes'],
                   'tasks'     : ss[sid]['ntask'],
                   't_done'    : ss[sid]['ntdone'],
                   't_failed'  : ss[sid]['ntfailed']})

df_info = pd.DataFrame(ssinfo)
df_info
[12]:
session resource cores_node gpus_node pilots ps_active cores gpus nodes tasks t_done t_failed
0 rp.session.mosto.mturilli.019432.0005 local.localhost 64 8 1 1 2048 256 32 2048 2048 0
1 rp.session.mosto.mturilli.019432.0003 local.localhost 64 8 1 1 512 64 8 2048 2048 0
2 rp.session.mosto.mturilli.019432.0004 local.localhost 64 8 1 1 1024 128 16 2048 2048 0
3 rp.session.mosto.mturilli.019432.0002 local.localhost 64 8 1 1 256 32 4 2048 2048 0

We can then derive task information for each session.

[13]:
tsinfo = {}
for sid in sids:

    tsinfo[sid] = []
    for task in tasks.get():

        treq = {
            'executable'       : task.description['executable'],
            'cpu_process_type' : task.description['cpu_process_type'],
            'cpu_processes'    : task.description['cpu_processes'],
            'cpu_thread_type'  : task.description['cpu_thread_type'],
            'cpu_threads'      : task.description['cpu_threads'],
            'gpu_process_type' : task.description['gpu_process_type'],
            'gpu_processes'    : task.description['gpu_processes'],
            'gpu_thread_type'  : task.description['gpu_thread_type'],
            'gpu_threads'      : task.description['gpu_threads']
        }

        if not tsinfo[sid]:
            treq['n_of_tasks'] = 1
            tsinfo[sid].append(treq)
            continue

        for i, ti in enumerate(tsinfo[sid]):
            counter = ti['n_of_tasks']
            ti.pop('n_of_tasks')

            if ti == treq:
                counter += 1
                tsinfo[sid][i]['n_of_tasks'] = counter
            else:
                treq['n_of_tasks'] = 1
                tsinfo[sid].append(treq)
tsinfo
[13]:
{'rp.session.mosto.mturilli.019432.0005': [{'executable': '/home/mturilli/github/radical.analytics/docs/source/bin/radical-pilot-hello.sh',
   'cpu_process_type': '',
   'cpu_processes': 0,
   'cpu_thread_type': '',
   'cpu_threads': 0,
   'gpu_process_type': '',
   'gpu_processes': 0,
   'gpu_thread_type': '',
   'gpu_threads': 0,
   'n_of_tasks': 2048}],
 'rp.session.mosto.mturilli.019432.0003': [{'executable': '/home/mturilli/github/radical.analytics/docs/source/bin/radical-pilot-hello.sh',
   'cpu_process_type': '',
   'cpu_processes': 0,
   'cpu_thread_type': '',
   'cpu_threads': 0,
   'gpu_process_type': '',
   'gpu_processes': 0,
   'gpu_thread_type': '',
   'gpu_threads': 0,
   'n_of_tasks': 2048}],
 'rp.session.mosto.mturilli.019432.0004': [{'executable': '/home/mturilli/github/radical.analytics/docs/source/bin/radical-pilot-hello.sh',
   'cpu_process_type': '',
   'cpu_processes': 0,
   'cpu_thread_type': '',
   'cpu_threads': 0,
   'gpu_process_type': '',
   'gpu_processes': 0,
   'gpu_thread_type': '',
   'gpu_threads': 0,
   'n_of_tasks': 2048}],
 'rp.session.mosto.mturilli.019432.0002': [{'executable': '/home/mturilli/github/radical.analytics/docs/source/bin/radical-pilot-hello.sh',
   'cpu_process_type': '',
   'cpu_processes': 0,
   'cpu_thread_type': '',
   'cpu_threads': 0,
   'gpu_process_type': '',
   'gpu_processes': 0,
   'gpu_thread_type': '',
   'gpu_threads': 0,
   'n_of_tasks': 2048}]}