Module t2py.T2Utils
Expand source code
#!/usr/bin/env python3
import json
import subprocess
from os.path import dirname, isfile, join, normpath, realpath
from typing import Any, Dict, List, Union
from multipledispatch import dispatch
class T2Utils:
"""Simple wrapper around T2 scripts and utilities."""
T2HOME: str = normpath(join(dirname(realpath(__file__)), '..', '..'))
"""The path to Tranalyzer home folder."""
T2PLHOME: str = join(T2HOME, 'plugins')
"""The path to Tranalyzer plugins folder, i.e., `$T2HOME/plugins`."""
T2BUILD: str = join(T2HOME, 'autogen.sh')
"""The path to the `autogen.sh` script, i.e., `$T2HOME/autogen.sh`."""
T2CONF: str = join(T2HOME, 'scripts', 't2conf', 't2conf')
"""The path to the `t2conf` script, i.e., `$T2HOME/scripts/t2conf/t2conf`."""
T2FM: str = join(T2HOME, 'scripts', 't2fm', 't2fm')
"""The path to the `t2fm` script, i.e., `$T2HOME/scripts/t2fm/t2fm`."""
T2PLUGIN: str = join(T2HOME, 'scripts', 't2plugin')
"""The path to the `t2plugin` script, i.e., `$T2HOME/scripts/t2plugin`."""
TAWK: str = join(T2HOME, 'scripts', 'tawk', 'tawk')
"""The path to the `tawk` script, i.e., `$T2HOME/scripts/tawk/tawk`."""
_all_plugins: List[str] = None
@staticmethod
def t2_exec(
debug: bool = False
) -> str:
"""Return the path to Tranalyzer2 release or debug executable.
Parameters
----------
debug : bool, default: False
- If `True`, returns the path to the debug executable.
- If `False`, returns the path to the release executable.
Returns
-------
str
The path to Tranalyzer2 executable.
Examples
--------
>>> T2Utils.t2_exec()
/home/user/tranalyzer2-0.9.0/tranalyzer2/build/tranalyzer
>>> T2Utils.t2_exec(debug=True)
/home/user/tranalyzer2-0.9.0/tranalyzer2/debug/tranalyzer
"""
folder: str = 'debug' if debug else 'build'
return join(T2Utils.T2HOME, 'tranalyzer2', folder, 'tranalyzer')
@staticmethod
def run_tranalyzer(
pcap: str = None,
iface: str = None,
pcap_list: Union[str, List[str]] = None,
output_prefix: str = None,
log_file: bool = False,
monitoring_file: bool = False,
packet_mode: bool = False,
plugin_folder: str = None,
loading_list: str = None,
plugins: List[str] = None,
bpf: str = None,
t2_exec: str = None,
timeout: int = None,
verbose: bool = False
):
"""Run Tranalyzer2.
Parameters
----------
pcap : str, default: None
Path to a pcap file.
iface : str, default: None
Name of a network interface.
pcap_list : str or list of str, default: None
- Path to a list of pcap files, e.g., `'/tmp/myPcaps.txt'`.
- Or list of path to pcap files, e.g., `['file1.pcap', 'file2.pcap']`.
output_prefix : str, default: None
If `None`, automatically derived from input.
log_file : bool, default: False
Save the final report in a `_log.txt` file.
monitoring_file : bool, default: False
Save the monitoring report in a `_monitoring.txt` file.
packet_mode : bool, default: False
Activate Tranalyzer2 packet mode.
plugin_folder : str, default: None
Path to the plugin folder.
loading_list : str, default: None
Path to a plugin loading list.
If set, the `plugins` parameter is ignored.
plugins : list of str, default: None
The list of plugins to use.
If `None`, load the plugins available in the plugin folder.
bpf : str, default: None
A BPF filter.
t2_exec : str, default: None
Path to the Tranalyzer2 executable.
If `None`, use `T2Utils.t2_exec()`
timeout : int, default: None
Number of seconds after which to terminate the process.
If `None`, run forever or until the end of file is reached.
verbose : bool, default: False
- If `True`, print the output (`stdout` and `stderr`) of the command.
- If `False`, do not print anything.
Raises
------
OSError
If specified interface `iface` does not exist locally.
RuntimeError
If none or more than one input (`pcap`, `pcap_list`, `iface`) is specified.
subprocess.CalledProcessError
If the process exited with a non-zero exit code.
subprocess.TimeoutExpired
If `timeout` was expired and its value reached.
Examples
--------
>>> T2Utils.run_tranalyzer(pcap='/tmp/file.pcap', output_prefix='/tmp/')
"""
if (pcap and pcap_list) or (pcap and iface) or (iface and pcap_list):
raise RuntimeError('Only one input (pcap, pcap_list, iface) may be specified')
if not pcap and not iface and not pcap_list:
raise RuntimeError('An input (pcap, pcap_list or iface) is required')
if not t2_exec:
t2_exec = T2Utils.t2_exec()
if not t2_exec or not isfile(t2_exec):
raise RuntimeError(f'Tranalyzer2 executable could not be found at {t2_exec}')
cmd = [t2_exec]
# Input arguments: -r pcap or -R pcap_list or -i iface
if pcap:
cmd.extend(['-r', pcap])
elif pcap_list:
if isinstance(pcap_list, list):
T2Utils.create_pcap_list(pcap_list, '/tmp/pcap_list.txt')
pcap_list = '/tmp/pcap_list.txt'
cmd.extend(['-R', pcap_list])
elif iface:
if iface not in T2Utils.network_interfaces():
raise OSError(f'Interface {iface} does not exist locally')
cmd.extend(['-i', iface])
# Output arguments: -w prefix / -l / -m / -s
if output_prefix:
cmd.extend(['-w', output_prefix])
if log_file:
cmd.append('-l')
if monitoring_file:
cmd.append('-m')
if packet_mode:
cmd.append('-s')
# Optional arguments
if not loading_list and plugins:
loading_list = '/tmp/plugins.load'
T2Utils.create_plugin_list(plugins, outfile=loading_list, verbose=verbose)
if loading_list:
cmd.extend(['-b', loading_list])
if plugin_folder:
cmd.extend(['-p', plugin_folder])
if bpf:
cmd.append(bpf)
subprocess.run(cmd, capture_output=not verbose, check=True, timeout=timeout)
@staticmethod
def list_config(
plugin: str
) -> List[str]:
"""List the configuration flags available for `plugin`.
Parameters
----------
plugin : str
The name of a plugin
Returns
-------
list of str
The list of configuration flags available for the plugin.
Raises
------
NameError
If the plugin could not be found.
Notes
-----
Equivalent to: `$ t2conf pluginName -I`.
Examples
--------
>>> T2Utils.list_config('arpDecode')
['MAX_IP']
"""
if plugin not in T2Utils.valid_plugin_names():
raise NameError(f'Plugin {plugin} could not be found')
cmd = [T2Utils.T2CONF, '-y', plugin, '-I']
config = subprocess.run(cmd, text=True, stdout=subprocess.PIPE).stdout.strip()
all_flags = config.split('\n') if config else list()
flags = [f.strip() for f in all_flags]
if len(flags) == 1 and flags[0] == f'No configuration flags found for {plugin}':
flags = list()
return flags
@staticmethod
def get_config(
plugin: str,
name: str,
infile: str = None
) -> Any:
"""Get the value of a define from a configuration or header file.
Parameters
----------
plugin : str
The name of a plugin.
name : str
The name of a configuration flag.
infile : str, default: None
Specify from which file (absolute path) to get the value from, e.g.,
`'$T2PLHOME/pluginName/pluginName.cfg'`.
Use the special value `'default'` to get the value from `default.config`.
If `'source'` or `None`, get the value from the header file.
Raises
------
NameError
If the plugin could not be found.
See Also
--------
T2Utils.get_config_from_source : Alias for `T2Utils.get_config(plugin, name, 'source')`
T2Utils.get_default : Alias for `T2Utils.get_config(plugin, name, 'default')`.
Notes
-----
Equivalent to: `$ t2conf pluginName -G name [-g infile]`.
Examples
--------
>>> T2Utils.get_config('arpDecode', 'MAX_IP')
10
"""
if plugin not in T2Utils.valid_plugin_names():
raise NameError(f'Plugin {plugin} could not be found')
cmd = [T2Utils.T2CONF, '-y', plugin, '-G', name]
if infile and infile != 'source':
cmd.extend(['-g', infile])
config = subprocess.run(cmd, text=True, stdout=subprocess.PIPE).stdout
val = config.split('=')[1].strip() if config else None
if val:
if val == 'no':
val = 0
elif val == 'yes':
val = 1
elif val.lstrip('-').isdigit():
val = int(val)
elif val.lstrip('-').replace('.', '', 1).isdigit():
val = float(val)
elif val.startswith('"'):
val = val.strip('"')
elif val.startswith("'"):
val = val.strip("'")
return val
@staticmethod
def get_default(
plugin: str,
name: str
) -> Any:
"""Get the default value of a define.
Parameters
----------
plugin : str
The name of a plugin.
name : str
The name of a configuration flag.
Raises
------
NameError
If the plugin could not be found.
See Also
--------
T2Utils.get_config : Get the value of a define from a configuration or header file.
T2Utils.get_config_from_source : Get the value of a define from a header file.
Notes
-----
Equivalent to: `$ t2conf pluginName -G name -g default`.
Alias for `T2Utils.get_config(plugin, name, infile='default')`.
Examples
--------
>>> T2Utils.get_default('arpDecode', 'MAX_IP')
10
"""
return T2Utils.get_config(plugin, name, 'default')
@staticmethod
def get_config_from_source(
plugin: str,
name: str
) -> Any:
"""Get the value of a define from the header file, e.g., `pluginName.h`.
Parameters
----------
plugin : str
The name of a plugin.
name : str
The name of a configuration flag.
Raises
------
NameError
If the plugin could not be found.
See Also
--------
T2Utils.get_config : Get the value of a define from a configuration or header file.
T2Utils.get_default : Get the default value of a define.
Notes
-----
Equivalent to: `$ t2conf pluginName -G name`.
Alias for `T2Utils.get_config(plugin, name, infile='source')`.
Examples
--------
>>> T2Utils.get_config_from_source('arpDecode', 'MAX_IP')
10
"""
return T2Utils.get_config(plugin, name, 'source')
@staticmethod
@dispatch(str, str, object, outfile=str)
def set_config(
plugin: str,
name: str,
value: Any,
outfile: str = None,
verbose: bool = False
):
"""Set the value of a define in a configuration or header file.
Parameters
----------
plugin : str
The name of a plugin.
name : str
The name of the configuration flag to set.
value : Any
The new value for `name`.
Use `'default'` to reset the value to its default.
outfile : str, default: None
Use `'source'` to set the value in the header file.
verbose : bool, default: False
- If `True`, print the output (`stdout` and `stderr`) of the command.
- If `False`, do not print anything.
Raises
------
NameError
If the plugin could not be found.
subprocess.CalledProcessError
If the process exited with a non-zero exit code.
Notes
-----
Equivalent to: `$ t2conf pluginName -D name=value [-g infile]`.
Examples
--------
>>> T2Utils.set_config('arpDecode', 'MAX_IP', 25)
"""
if plugin not in T2Utils.valid_plugin_names():
raise NameError(f'Plugin {plugin} could not be found')
current = T2Utils.get_config(plugin, name)
# Make sure value is properly quoted
if isinstance(current, str):
if current[0] == '"' and current[-1] == '"':
if value[0] != '"':
value = '"' + value
if value[-1] != '"':
value += '"'
cmd = [T2Utils.T2CONF, '-y', plugin, '-D', f'{name}={value}']
if outfile and outfile != 'source':
cmd.extend(['-g', outfile])
subprocess.run(cmd, capture_output=not verbose, check=True)
@staticmethod
@dispatch(str, dict, outfile=str)
def set_config(
plugin: str,
name_val: Dict[str, Any],
outfile: str = None,
verbose: bool = False
):
"""Set the value of a define in a configuration or header file.
Parameters
----------
plugin : str
The name of a plugin.
name_val : dict of str, Any
Dictionary of key/value to set.
outfile : str, default: None
Use `'source'` to set the value in the header file.
verbose : bool, default: False
- If `True`, print the output (`stdout` and `stderr`) of the command.
- If `False`, do not print anything.
Raises
------
NameError
If the plugin could not be found.
subprocess.CalledProcessError
If one of the processes exited with a non-zero exit code.
Notes
-----
Equivalent to: `$ t2conf pluginName -D name1=value1 [-D name2=value2 ...] [-g outfile]`.
Examples
--------
>>> T2Utils.set_config('arpDecode', 'MAX_IP', 25)
"""
for name, value in name_val.items():
T2Utils.set_config(plugin, name, value, outfile=outfile, verbose=verbose)
@staticmethod
def set_default(
plugin: Union[str, List[str]],
name: Union[str, List[str]] = None,
outfile: str = None,
verbose: bool = False
):
"""Reset the value of a define to its default.
Parameters
----------
plugin : str or list of str
`plugin` can be:
- a single plugin, e.g., `'pluginName'`
- a list of plugins, e.g., `['pluginName1', 'pluginName2']`
- `all` to apply the operation to all available plugins
name : str or list of str, default: None
The name of a configuration flag to reset.
If `name` is set, only reset the value for the specified flag(s).
outfile : str, default: None
Path to a configuration file where the changes will be rest.
Use `outfile='source'` to set the value in the header file.
verbose : bool, default: False
- If `True`, print the output (`stdout` and `stderr`) of the command.
- If `False`, do not print anything.
Raises
------
NameError
If a plugin could not be found.
NotImplementedError
If trying to reset specific flag(s) with multiple plugins.
subprocess.CalledProcessError
If the process exited with a non-zero exit code.
See Also
--------
T2Utils.set_config : Set the value of a define in a configuration or header file.
T2Utils.reset_config : Reset all configuration flags from a plugin to their default values.
Notes
-----
Equivalent to: `$ t2conf pluginName -D name=default [-g outfile]`.
Alias for `T2Utils.set_config(plugin, name, 'default', outfile)`.
Examples
--------
>>> T2Utils.set_default('arpDecode', 'MAX_IP')
>>> T2Utils.set_default('basicStats', ['BS_VAR', 'BS_STDDEV'])
>>> T2Utils.set_default(['arpDecode', 'basicStats'])
"""
if name:
if isinstance(plugin, list):
raise NotImplementedError('Cannot reset specific flag(s) with list of plugins')
elif plugin == 'all':
raise NotImplementedError("Cannot reset specific flag(s) for 'all' plugins")
if isinstance(name, list):
for n in name:
T2Utils.set_config(plugin, n, 'default', outfile=outfile, verbose=verbose)
else:
T2Utils.set_config(plugin, name, 'default', outfile=outfile, verbose=verbose)
else:
cmd = [T2Utils.T2CONF, '-y', '--reset']
accept_outfile = False
if plugin == 'all':
cmd.append('-a')
elif isinstance(plugin, list):
cmd.extend(plugin)
else:
accept_outfile = True
cmd.append(plugin)
if outfile and outfile != 'source':
if not accept_outfile:
raise NotImplementedError("Cannot reset configuration in specific file with list of plugins or 'all' plugins")
cmd.extend(['-g', outfile])
subprocess.run(cmd, capture_output=not verbose, check=True)
@staticmethod
def reset_config(
plugin: Union[str, List[str]],
name: Union[str, List[str]] = None,
outfile: str = None,
verbose: bool = False
):
"""Reset all configuration flags from a plugin to their default values.
Parameters
----------
plugin : str or list of str
`plugin` can be:
- a single plugin, e.g., `'pluginName'`
- a list of plugins, e.g., `['pluginName1', 'pluginName2']`
name : str, default: None
The name of a configuration flag to reset.
If `name` is set, only reset the value for the specified flag(s).
outfile : str, default: None
Path to a configuration file where the changes will be rest.
Use `outfile='source'` to reset the value in the header file.
verbose : bool, default: False
- If `True`, print the output (`stdout` and `stderr`) of the command.
- If `False`, do not print anything.
Raises
------
NameError
If a plugin could not be found.
NotImplementedError
If trying to reset specific flag(s) with multiple plugins.
subprocess.CalledProcessError
If the process exited with a non-zero exit code.
See Also
--------
T2Utils.set_default : Set the value of a define in a configuration or header file.
Notes
-----
Equivalent to: `$ t2conf pluginName --reset [-g outfile]`.
Alias for `T2Utils.set_default(plugin, None, outfile)`.
Examples
--------
>>> T2Utils.reset_config('arpDecode')
>>> T2Utils.reset_config('arpDecode', 'MAX_IP')
>>> T2Utils.reset_config(['arpDecode', 'txtSink'])
>>> T2Utils.reset_config('arpDecode', 'MAX_IP', '/tmp/arpDecode.config')
"""
T2Utils.set_default(plugin, name, outfile=outfile, verbose=verbose)
@staticmethod
def generate_config(
plugin: str,
outfile: str = None,
verbose: bool = False
):
"""Generate a configuration file for `plugin`.
Parameters
----------
plugin : str
The name of a plugin.
outfile : str, default: None
The filename where to save the configuration.
If `None`, default to `'pluginName.config'` in the plugin home folder.
verbose : bool, default: False
- If `True`, print the output (`stdout` and `stderr`) of the command.
- If `False`, do not print anything.
Raises
------
NameError
If the plugin could not be found.
subprocess.CalledProcessError
If the process exited with a non-zero exit code.
See Also
--------
T2Utils.apply_config : Apply a configuration file to the sources.
Notes
-----
Equivalent to: `$ t2conf pluginName -g [file]`.
Examples
--------
>>> T2Utils.generate_config('arpDecode')
>>> T2Utils.generate_config('arpDecode', '/tmp/arpDecode.config')
"""
if plugin not in T2Utils.valid_plugin_names():
raise NameError(f'Plugin {plugin} could not be found')
cmd = [T2Utils.T2CONF, '-y', plugin, '-g']
if outfile:
cmd.append(outfile)
subprocess.run(cmd, capture_output=not verbose, check=True)
@staticmethod
def apply_config(
plugin: str,
infile: str = None,
verbose: bool = False
):
"""Apply a configuration file to the sources.
Parameters
----------
plugin : str
The name of a plugin.
infile : str, default: None
The name of the configuration file to apply.
If `None`, default to `'pluginName.config'` in the plugin home folder.
verbose : bool, default: False
- If `True`, print the output (`stdout` and `stderr`) of the command.
- If `False`, do not print anything.
Raises
------
subprocess.CalledProcessError
If the process exited with a non-zero exit code.
See Also
--------
T2Utils.generate_config : Generate configuration files for plugins
Notes
-----
Equivalent to: `$ t2conf pluginName -C [file|auto]`.
Examples
--------
>>> T2Utils.apply_config('arpDecode')
>>> T2Utils.apply_config('arpDecode', '/tmp/arpDecode.config')
"""
if plugin not in T2Utils.valid_plugin_names():
raise NameError(f'Plugin {plugin} could not be found')
cmd = [T2Utils.T2CONF, '-y', plugin, '-C']
cmd.append(infile if infile else 'auto')
subprocess.run(cmd, capture_output=not verbose, check=True)
@staticmethod
def build(
plugin: Union[str, List[str]],
plugin_folder: str = None,
force_rebuild: bool = False,
debug: bool = False,
verbose: bool = False
):
"""Build a plugin or Tranalyzer2.
Parameters
----------
plugin : str or list of str
`plugin` can be:
- a single plugin, e.g., `'pluginName'`
- a list of plugins, e.g., `['pluginName1', 'pluginName2']`
- a plugin loading list, e.g., `'myPlugins.txt'`
- `all` to apply the operation to all available plugins
plugin_folder : str, default: None
Path to the plugin folder.
If `None`, default to `'$HOME/.tranalyzer/plugins'`.
force_rebuild : bool, default: False
Force the rebuild of the configuration files (`$ t2build -r ...`).
debug : bool, default: False
Build the plugin(s) in debug mode.
verbose : bool, default: False
- If `True`, print the output (`stdout` and `stderr`) of the command.
- If `False`, do not print anything.
Raises
------
subprocess.CalledProcessError
If the process exited with a non-zero exit code.
See Also
--------
T2Utils.clean : Remove files generated by `T2Utils.build()`.
Notes
-----
Equivalent to:
- `$ t2build pluginName`
- `$ t2build pluginName1 pluginName2`
- `$ t2build -b myPlugins.txt`
- `$ t2build -b -a`
Examples
--------
>>> T2Utils.build('arpDecode')
>>> T2Utils.build(['arpDecode', 'txtSink'])
>>> T2Utils.build('/tmp/myPlugins.load')
>>> T2Utils.build('all')
"""
cmd = [T2Utils.T2BUILD, '-y']
if debug:
cmd.append('-d')
if force_rebuild:
cmd.append('-r')
if plugin_folder:
cmd.extend(['-p', plugin_folder])
if plugin == 'tranalyzer2':
cmd.extend(['-i', plugin])
elif plugin == 'all':
cmd.append('-a')
elif isinstance(plugin, list):
if 'tranalyzer2' in plugin:
cmd.extend(['-i', 'tranalyzer2'])
plugin.remove('tranalyzer2')
cmd.extend(plugin)
elif isfile(plugin):
cmd.extend(['-b', plugin])
else:
cmd.append(plugin)
subprocess.run(cmd, capture_output=not verbose, check=True)
@staticmethod
def clean(
plugin: Union[str, List[str]],
verbose: bool = False
):
"""Remove files generated by `T2Utils.build()`.
Parameters
----------
plugin : str or list of str
`plugin` can be:
- a single plugin, e.g., `'pluginName'`
- a list of plugins, e.g., `['pluginName1', 'pluginName2']`
- a plugin loading list, e.g., `'myPlugins.txt'`
- `all` to apply the operation to all available plugins
verbose : bool, default: False
- If `True`, print the output (`stdout` and `stderr`) of the command.
- If `False`, do not print anything.
Raises
------
subprocess.CalledProcessError
If the process exited with a non-zero exit code.
See Also
--------
T2Utils.build : Build a plugin or Tranalyzer2.
Notes
-----
Equivalent to:
- `$ t2build -c pluginName`
- `$ t2build -c pluginName1 pluginName2`
- `$ t2build -c -b myPlugins.txt`
- `$ t2build -c -a`
Examples
--------
>>> T2Utils.clean('arpDecode')
>>> T2Utils.clean(['arpDecode', 'txtSink'])
>>> T2Utils.clean('/tmp/myPlugins.load')
>>> T2Utils.clean('all')
"""
cmd = [T2Utils.T2BUILD, '-y', '-c']
if plugin == 'all':
cmd.append('-a')
elif isinstance(plugin, list):
cmd.extend(plugin)
elif isfile(plugin):
cmd.extend(['-b', plugin])
else:
cmd.append(plugin)
subprocess.run(cmd, capture_output=not verbose, check=True)
@staticmethod
def unload(
plugin: Union[str, List[str]],
plugin_folder: str = None,
verbose: bool = False
):
"""Remove (unload) a plugin from the plugin folder.
Parameters
----------
plugin : str or list of str
`plugin` can be:
- a single plugin, e.g., `'pluginName'`
- a list of plugins, e.g., `['pluginName1', 'pluginName2']`
- a plugin loading list, e.g., `'myPlugins.txt'`
- `all` to apply the operation to all available plugins
plugin_folder : str, default: None
The plugin folder from where the plugin must be unloaded.
If `None`, use the default plugin folder (`'$HOME/.tranalyzer/plugins'`).
verbose : bool, default: False
- If `True`, print the output (`stdout` and `stderr`) of the command.
- If `False`, do not print anything.
Raises
------
subprocess.CalledProcessError
If the process exited with a non-zero exit code.
Notes
-----
Equivalent to:
- `$ t2build -u pluginName`
- `$ t2build -u pluginName1 pluginName2`
- `$ t2build -u -b myPlugins.txt`
- `$ t2build -u -a`
Examples
--------
>>> T2Utils.unload('arpDecode')
>>> T2Utils.unload(['arpDecode', 'txtSink'])
>>> T2Utils.unload('/tmp/myPlugins.load')
>>> T2Utils.unload('all')
"""
cmd = [T2Utils.T2BUILD, '-y', '-u']
if plugin_folder:
cmd.extend(['-p', plugin_folder])
if plugin == 'all':
cmd.append('-a')
elif isinstance(plugin, list):
cmd.extend(plugin)
elif isfile(plugin):
cmd.extend(['-b', plugin])
else:
cmd.append(plugin)
subprocess.run(cmd, capture_output=not verbose, check=True)
@staticmethod
def plugin_description(
plugin: str
) -> str:
"""Return a short description of a plugin.
Parameters
----------
plugin : str
The name of a plugin.
Returns
-------
str
The description of the plugin.
Raises
------
NameError
If the plugin could not be found.
Notes
-----
Equivalent to: `$ t2plugin -l | awk '$1 == "plugin" { sub("^" $1 FS "+" $2 FS "+", ""); print }'`.
Examples
--------
>>> T2Utils.plugin_description('arpDecode')
'Address Resolution Protocol (ARP)'
"""
if plugin not in T2Utils.valid_plugin_names():
raise NameError(f'Plugin {plugin} could not be found')
cmd = [T2Utils.T2PLUGIN, '-l']
out = subprocess.run(cmd, text=True, stdout=subprocess.PIPE).stdout
for line in out.splitlines():
cols = line.split()
if len(cols) > 2 and cols[0] == plugin:
return ' '.join(cols[2:])
@staticmethod
def plugin_number(
plugin: str
) -> str:
"""Return the plugin number of a plugin.
Parameters
----------
plugin : str
The name of a plugin.
Returns
-------
str
The plugin number.
Raises
------
NameError
If the plugin could not be found.
Notes
-----
Equivalent to: `$ t2plugin -l | awk '$1 == "pluginName" { print $2; exit }'`
Examples
--------
>>> T2Utils.plugin_number('arpDecode')
'179'
"""
if plugin not in T2Utils.valid_plugin_names():
raise NameError(f'Plugin {plugin} could not be found')
cmd = [T2Utils.T2PLUGIN, '-l']
out = subprocess.run(cmd, text=True, stdout=subprocess.PIPE).stdout
for line in out.splitlines():
cols = line.split()
if len(cols) > 2 and cols[0] == plugin:
return cols[1]
@staticmethod
def plugins(
category: Union[str, List[str]] = None
) -> List[str]:
"""Alphabetically List all the available plugins.
Parameters
----------
category : str or list of str, default: None
`category` can be used to specify the types of plugins to list:
- `'g'` : global
- `'b'` : basic
- `'l2'`: layer 2
- `'l4'`: layer 3/4
- `'l7'`: layer 7
- `'a'` : application
- `'m'` : math
- `'c'` : classifier
- `'o'` : output
- `None`: all
Returns
-------
list of str
List of all available plugins.
Examples
--------
>>> T2Utils.plugins()
>>> T2Utils.plugins('g')
['protoStats']
>>> T2Utils.plugins(['g', 'l2'])
['arpDecode', 'cdpDecode', 'lldpDecode', 'protoStats', 'stpDecode', 'vtpDecode']
"""
cmd = [T2Utils.T2PLUGIN, '-H', '-N']
if isinstance(category, list):
for c in category:
cmd.append(f'-l={c}')
elif category:
cmd.append(f'-l={category}')
elif T2Utils._all_plugins:
return T2Utils._all_plugins
else:
cmd.append('-l')
out = subprocess.run(cmd, text=True, stdout=subprocess.PIPE).stdout
sorted_list = sorted(out.splitlines())
if not category:
T2Utils._all_plugins = sorted_list
return sorted_list
@staticmethod
def list_plugins(
infile: str = None
) -> List[str]:
"""List the active plugins in `infile`.
Parameters
----------
infile : str, default: None
Name of a plugin loading list.
If `None`, default to `'plugins.load'` in the plugin folder.
Returns
-------
list of str
List of active plugins in `infile`.
Notes
-----
Equivalent to: `$ t2conf -S [file]`.
Examples
--------
>>> T2Utils.list_plugins()
['arpDecode', 'txtSink']
>>> T2Utils.list_plugins('/tmp/myPlugins.load')
['arpDecode', 'txtSink']
"""
cmd = [T2Utils.T2CONF, '-y', '-S']
if infile:
cmd.append(infile)
out = subprocess.run(cmd, text=True, stdout=subprocess.PIPE).stdout
return sorted(out.splitlines())
@staticmethod
def create_pcap_list(
pcaps: List[str],
outfile: str = None
):
"""Create a loading list of pcaps.
Parameters
----------
pcaps : list of str
List of pcap files.
outfile : str, default: None
Name of the file to save.
If `None`, default to `/tmp/pcap_list.txt'`.
Notes
-----
Equivalent to: `$ t2caplist pcaps > outfile`.
Examples
--------
>>> T2Utils.create_pcap_list(['file1.pcap', 'file2.pcap'])
>>> T2Utils.create_pcap_list(['file1.pcap', 'file2.pcap'], '/tmp/myPcaps.txt')
"""
if not outfile:
outfile = '/tmp/pcap_list.txt'
with open(outfile, 'w') as f:
for pcap in pcaps:
f.write(pcap + '\n')
@staticmethod
def create_plugin_list(
plugins: List[str],
outfile: str = None,
verbose: bool = False
):
"""Create a loading list of plugins.
Parameters
----------
plugins : list of str
List of plugin names.
outfile : str, default: None
Name of the file to save.
If `None`, default to `'plugins.load'` in the plugin folder.
verbose : bool, default: False
- If `True`, print the output (`stdout` and `stderr`) of the command.
- If `False`, do not print anything.
Raises
------
subprocess.CalledProcessError
If the process exited with a non-zero exit code (if `plugins is not None`).
Notes
-----
Equivalent to: `$ t2conf -L plugin1 plugin2 ...`.
Examples
--------
>>> T2Utils.create_plugin_list(['arpDecode', 'txtSink'])
>>> T2Utils.create_plugin_list(['arpDecode', 'txtSink'], '/tmp/myPlugins.txt')
"""
if outfile:
# simpler format, only active plugins
with open(outfile, 'w') as f:
for plugin in plugins:
f.write(plugin + '\n')
else:
cmd = [T2Utils.T2CONF, '-y', '-L']
cmd.extend(plugins)
subprocess.run(cmd, capture_output=not verbose, check=True)
@staticmethod
def tawk(
program: str = None,
filename: str = None,
options: List[str] = None
) -> str:
"""Call `tawk` with the `program`, `options` and `filename` specified.
Parameters
----------
program : str, default: None
A `tawk` program, e.g., `"$dstPort == 80 { print tuple5() }"`.
filename : str, default: None
Name of the file to analyze.
options : list of str, default: None
List of options to pass to `tawk`.
Returns
-------
str
The result of the `tawk` command.
Raises
------
RuntimeError
If `filename` was specified, but the file does not exist.
subprocess.CalledProcessError
If the process exited with a non-zero exit code.
Notes
-----
Equivalent to: `$ tawk [options] 'program' filename`.
Examples
--------
>>> result = T2Utils.tawk(None, None, ['-V', 'flowStat=0x1234'])
>>> print(result)
>>> T2Utils.tawk('host("1.2.3.4") { aggr(proto()) }', 'file.txt')
"""
if filename and not isfile(filename):
raise RuntimeError(f"Input file '{filename}' does not exist")
cmd = [T2Utils.TAWK]
if program:
cmd.append(program)
if options:
cmd.extend(options)
if filename:
cmd.append(filename)
result = subprocess.run(cmd, capture_output=True, check=True)
result = result.stdout.decode('utf8').strip()
return result
@staticmethod
def follow_stream(
filename: str,
flow: int,
output_format: Union[int, str] = 2,
direction: str = None,
payload_format: Union[int, str] = 4,
reassembly: bool = True,
colors: bool = True
) -> Union[str, List[str], List[bytearray], List[Dict[str, Any]]]:
"""Call `tawk` `follow_stream()` function with the specified options.
Return the reassembled payload of flow with index `flow` and direction `direction`.
Parameters
----------
filename : str
Name of the file to analyze.
flow : int
Index (`flowInd`) of the flow to analyze.
output_format : int or str, default: 2
The format to use for the output:
- 0 or `'payload_only'`
- 1 or `'info'`: Prefix each payload with packet/flow info
- 2 or `'json'`
- 3 or `'reconstruct'`
direction : str, default: None
The direction to follow (`'A'` or `'B'`).
Use `None` to follow both directions.
payload_format : int or str, default: 4
The format to use for the payload:
- 0 or 'ascii'
- 1 or 'hexdump'
- 2, 'raw' or 'binary'
- 3 or 'base64'
- 4 or 'bytearray'
reassembly : bool, default: True
Analyze TCP sequence numbers, reassemble and reorder TCP segments.
colors : bool, default: True
Output colors.
Returns
-------
list of bytearray
If `output_format` is 0 or 2 and `payload_format` is 4 or `'bytearray'`.
list of dict
If `output_format` is 2.
list of str
If `output_format` is 0 and `payload_format` is not 4 or `'bytearray'`.
str
If none of the above applied.
Raises
------
RuntimeError
- If `filename` could not be found.
- If `output_format` or `payload_format` was out of bound.
- If `direction` was not `'A'`, `'B'` or `None`.
- If `output_format` was 3 or `'reconstruct'` and `direction` was not `'A'` or `'B'`.
- If `payload_format` was 4 or `'bytearray'` and `output_format` was 1 or `'info'`.
subprocess.CalledProcessError
If the process exited with a non-zero exit code.
See Also
--------
T2Utils.tawk : Call `tawk` with the `program`, `options` and `filename` specified.
Notes
-----
Equivalent to: `$ tawk 'follow_stream(flow, output_format, direction, payload_format, not reassembly, not colors)' filename`.
Examples
--------
>>> result = T2Utils.follow_stream(1, payload_format='hexdump', direction='B')
>>> print(result)
"""
if not filename:
raise RuntimeError("Input file != None is required")
if direction:
if direction not in ('A', 'B'):
raise RuntimeError(f"Invalid value for 'direction' parameter: '{direction}'")
direction = f'"{direction}"'
if output_format in (0, 'payload_only'):
output_format = 0
elif output_format in (1, 'info'):
output_format = 1
elif output_format in (2, 'json'):
output_format = 2
elif output_format in (3, 'reconstruct'):
output_format = 3
else:
raise RuntimeError(f"Invalid value for 'output_format' parameter: '{output_format}'")
if output_format == 3 and direction is None:
raise RuntimeError("Payload reconstruction (output format 3 or 'reconstruct') requires a direction ('A' or 'B').")
if payload_format in (0, 'ascii'):
payload = 0
elif payload_format in (1, 'hexdump'):
payload = 1
elif payload_format in (2, 'raw', 'binary'):
payload = 2
elif payload_format in (4, 'bytearray'):
if output_format == 1:
raise RuntimeError("'payload_format' 4 or 'bytearray' cannot be specified with 'output_format' 1 or 'info'")
payload = 2
colors = False
elif payload_format in (3, 'base64'):
payload = 3
else:
raise RuntimeError(f"Invalid value for 'payload_format' parameter: '{payload_format}'")
prog = (
'follow_stream('
f'{flow},'
f'{output_format},'
f'{direction if direction else 0},'
f'{payload},'
f'{0 if reassembly else 1},'
f'{0 if colors else 1}'
')'
)
result = T2Utils.tawk(prog, filename)
if output_format == 0:
result = result.split('\n') if result else []
elif output_format == 2:
result = result.split('\n') if result else []
result = [json.loads(s) if s else {} for s in result]
if payload_format in (4, 'bytearray'):
if output_format == 3:
result = bytearray.fromhex(result)
elif output_format == 0:
result = [bytearray.fromhex(s) for s in result]
elif output_format == 2:
for packet in result:
packet['payload'] = bytearray.fromhex(packet['payload'])
return result
@staticmethod
def load_plugins(
plugin: Union[str, List[str]] = None
):
"""Load plugins as `T2Plugin` objects.
The plugins will be available as, e.g., ``T2Utils.pluginName``.
Parameters
----------
plugin : str or list of str, default: None
Name of the plugin(s) to load.
`plugin` can be:
- a plugin name
- a list of plugin names
- `None` or 'all' (slow)
Raises
------
NameError
If a plugin could not be found.
Examples
--------
>>> T2Utils.load_plugins('arpDecode')
>>> T2Utils.load_plugins(['arpDecode', 'txtSink'])
>>> T2Utils.load_plugins()
>>> T2Utils.arpDecode
<t2py.T2Plugin.T2Plugin object at 0x16055a370>
"""
from .T2Plugin import T2Plugin
all_plugins = T2Utils.valid_plugin_names()
if isinstance(plugin, list):
plugins = plugin
elif not plugin or plugin == 'all':
plugins = all_plugins
else:
plugins = [plugin]
for p in plugins:
if p not in all_plugins:
raise NameError(f'Plugin {p} could not be found')
else:
setattr(T2Utils, p, T2Plugin(p))
@staticmethod
def _tsv_to_list(
filename: str,
delimiter: str = '\t'
) -> List[Dict[str, Any]]:
"""Convert a tsv/csv file to a python list of JSON objects.
Parameters
----------
filename : str
The name of the file to load.
delimiter : str, default: '\\t'
The delimiter used to separate columns in the input file.
Returns
-------
list of dict
A list of dictionary, whose entries are the column names and their associated values.
"""
import csv
data = []
with open(filename) as f:
reader = csv.DictReader(f, delimiter=delimiter)
reader.fieldnames[0] = reader.fieldnames[0].lstrip('%').strip()
for flow in reader:
data.append(flow)
return data
@staticmethod
def to_json_array(
infile: str,
delimiter: str = '\t'
) -> List[Dict[str, Any]]:
"""Convert a flow or packet file to an array of JSON objects.
Parameters
----------
infile : str
Path to a flow or packet file.
delimiter : str, default: '\\t'
Field delimiter used in the input file.
Returns
-------
list of dict
The list of rows contained in `infile`.
Each row is represented as a dict with the column names as key.
Raises
------
json.decoder.JSONDecodeError
If `infile` was a JSON file and the decoding process failed.
Examples
--------
>>> flows = T2Utils.to_json_array('/tmp/file_flows.txt')
>>> flows
[{'dir': 'A', 'flowInd': 1, ...}, {'dir': 'B', 'flowInd': 1, ...}, ...]
"""
data = []
if not infile.endswith('.json'):
data = T2Utils._tsv_to_list(infile, delimiter)
else:
with open(infile) as f:
for flow in f:
if flow in ('[\n', ']\n'):
continue
data.append(json.loads(flow.strip(',')))
return data
@staticmethod
def to_pandas(
infile: str,
delimiter: str = None
) -> "pandas.core.frame.DataFrame":
"""Convert a flow or packet file to pandas `DataFrame`.
Parameters
----------
infile : str
Path to a flow or packet file.
delimiter : str, default: None
Field delimiter used in the input file.
Returns
-------
pd.DataFrame
DataFrame holding the tabular data stored in `infile`.
Examples
--------
>>> df = T2Utils.to_pandas('/tmp/file_flows.txt')
>>> type(df)
<class 'pandas.core.frame.DataFrame'>
"""
import pandas as pd
if infile.endswith('.json'):
return pd.DataFrame(T2Utils.to_json_array(infile))
else:
return pd.read_table(infile, delimiter=delimiter)
@staticmethod
def to_pdf(
pcap: str = None,
flow_file: str = None,
prefix: str = None,
config: bool = True,
reset_config: bool = True,
open_pdf: bool = True,
verbose: bool = False
):
"""Generate a PDF/LaTeX report from a PCAP or Tranalyzer flow file."
Parameters
----------
pcap : str, default: None
PCAP file to analyze.
file : str, default: None
Flow file to analyze.
prefix : str, default: None
Append 'prefix' to any output file produced.
config : bool, default: True
If `True`, configure and build Tranalyzer2 and the plugins as required.
reset_config : bool, default: True
If `True`, reset tranalyzer2 and the plugins configuration at the end.
open_pdf : bool, default: True
If `True`, automatically open the generated PDF.
verbose : bool, default: False
- If `True`, print the output (`stdout` and `stderr`) of the command.
- If `False`, do not print anything.
Raises
------
RuntimeError
- If one of the required parameter (pcap or flow_file) was not specified.
- If multiple inputs (pcap and flow_file) were specified.
subprocess.CalledProcessError
If the process exited with a non-zero exit code.
Notes
-----
Equivalent to: `$ t2fm [-b] [-A] [-r pcap|-F file]`.
Examples
--------
>>> T2Utils.to_pdf(pcap='file.pcap')
>>> T2Utils.to_pdf(flow_file='file_flows.txt')
"""
if not pcap and not flow_file:
raise RuntimeError('One of (pcap, flow_file) must be specified.')
elif pcap and flow_file:
raise RuntimeError('Only one of (pcap, flow_file) must be specified.')
cmd = [T2Utils.T2FM, '-y']
# Input
if pcap:
cmd.extend(['-r', pcap])
elif flow_file:
cmd.extend(['-F', flow_file])
# Output
if prefix:
cmd.extend(['-w', prefix])
# Options
if config:
cmd.append('-b')
if reset_config:
cmd.append('--reset')
if open_pdf:
cmd.append('-A')
subprocess.run(cmd, capture_output=not verbose, check=True)
@staticmethod
def network_interfaces() -> List[str]:
"""List the names of all available network interfaces.
Returns
-------
list of str
The list of available network interfaces.
Examples
--------
>>> T2Utils.network_interfaces()
['lo0', 'eth0']
"""
import socket
return [iface[1] for iface in socket.if_nameindex()]
@staticmethod
def valid_plugin_names() -> List[str]:
"""Return a list of valid plugin names.
Returns
-------
list of str
The list of valid plugin names.
Examples
--------
>>> T2Utils.valid_plugin_names()
['arpDecode', ..., 'tranalyzer2', ...]
"""
valid_plugins = ['tranalyzer2']
valid_plugins.extend(T2Utils.plugins())
return sorted(valid_plugins)
Classes
class T2Utils
-
Simple wrapper around T2 scripts and utilities.
Expand source code
class T2Utils: """Simple wrapper around T2 scripts and utilities.""" T2HOME: str = normpath(join(dirname(realpath(__file__)), '..', '..')) """The path to Tranalyzer home folder.""" T2PLHOME: str = join(T2HOME, 'plugins') """The path to Tranalyzer plugins folder, i.e., `$T2HOME/plugins`.""" T2BUILD: str = join(T2HOME, 'autogen.sh') """The path to the `autogen.sh` script, i.e., `$T2HOME/autogen.sh`.""" T2CONF: str = join(T2HOME, 'scripts', 't2conf', 't2conf') """The path to the `t2conf` script, i.e., `$T2HOME/scripts/t2conf/t2conf`.""" T2FM: str = join(T2HOME, 'scripts', 't2fm', 't2fm') """The path to the `t2fm` script, i.e., `$T2HOME/scripts/t2fm/t2fm`.""" T2PLUGIN: str = join(T2HOME, 'scripts', 't2plugin') """The path to the `t2plugin` script, i.e., `$T2HOME/scripts/t2plugin`.""" TAWK: str = join(T2HOME, 'scripts', 'tawk', 'tawk') """The path to the `tawk` script, i.e., `$T2HOME/scripts/tawk/tawk`.""" _all_plugins: List[str] = None @staticmethod def t2_exec( debug: bool = False ) -> str: """Return the path to Tranalyzer2 release or debug executable. Parameters ---------- debug : bool, default: False - If `True`, returns the path to the debug executable. - If `False`, returns the path to the release executable. Returns ------- str The path to Tranalyzer2 executable. Examples -------- >>> T2Utils.t2_exec() /home/user/tranalyzer2-0.9.0/tranalyzer2/build/tranalyzer >>> T2Utils.t2_exec(debug=True) /home/user/tranalyzer2-0.9.0/tranalyzer2/debug/tranalyzer """ folder: str = 'debug' if debug else 'build' return join(T2Utils.T2HOME, 'tranalyzer2', folder, 'tranalyzer') @staticmethod def run_tranalyzer( pcap: str = None, iface: str = None, pcap_list: Union[str, List[str]] = None, output_prefix: str = None, log_file: bool = False, monitoring_file: bool = False, packet_mode: bool = False, plugin_folder: str = None, loading_list: str = None, plugins: List[str] = None, bpf: str = None, t2_exec: str = None, timeout: int = None, verbose: bool = False ): """Run Tranalyzer2. Parameters ---------- pcap : str, default: None Path to a pcap file. iface : str, default: None Name of a network interface. pcap_list : str or list of str, default: None - Path to a list of pcap files, e.g., `'/tmp/myPcaps.txt'`. - Or list of path to pcap files, e.g., `['file1.pcap', 'file2.pcap']`. output_prefix : str, default: None If `None`, automatically derived from input. log_file : bool, default: False Save the final report in a `_log.txt` file. monitoring_file : bool, default: False Save the monitoring report in a `_monitoring.txt` file. packet_mode : bool, default: False Activate Tranalyzer2 packet mode. plugin_folder : str, default: None Path to the plugin folder. loading_list : str, default: None Path to a plugin loading list. If set, the `plugins` parameter is ignored. plugins : list of str, default: None The list of plugins to use. If `None`, load the plugins available in the plugin folder. bpf : str, default: None A BPF filter. t2_exec : str, default: None Path to the Tranalyzer2 executable. If `None`, use `T2Utils.t2_exec()` timeout : int, default: None Number of seconds after which to terminate the process. If `None`, run forever or until the end of file is reached. verbose : bool, default: False - If `True`, print the output (`stdout` and `stderr`) of the command. - If `False`, do not print anything. Raises ------ OSError If specified interface `iface` does not exist locally. RuntimeError If none or more than one input (`pcap`, `pcap_list`, `iface`) is specified. subprocess.CalledProcessError If the process exited with a non-zero exit code. subprocess.TimeoutExpired If `timeout` was expired and its value reached. Examples -------- >>> T2Utils.run_tranalyzer(pcap='/tmp/file.pcap', output_prefix='/tmp/') """ if (pcap and pcap_list) or (pcap and iface) or (iface and pcap_list): raise RuntimeError('Only one input (pcap, pcap_list, iface) may be specified') if not pcap and not iface and not pcap_list: raise RuntimeError('An input (pcap, pcap_list or iface) is required') if not t2_exec: t2_exec = T2Utils.t2_exec() if not t2_exec or not isfile(t2_exec): raise RuntimeError(f'Tranalyzer2 executable could not be found at {t2_exec}') cmd = [t2_exec] # Input arguments: -r pcap or -R pcap_list or -i iface if pcap: cmd.extend(['-r', pcap]) elif pcap_list: if isinstance(pcap_list, list): T2Utils.create_pcap_list(pcap_list, '/tmp/pcap_list.txt') pcap_list = '/tmp/pcap_list.txt' cmd.extend(['-R', pcap_list]) elif iface: if iface not in T2Utils.network_interfaces(): raise OSError(f'Interface {iface} does not exist locally') cmd.extend(['-i', iface]) # Output arguments: -w prefix / -l / -m / -s if output_prefix: cmd.extend(['-w', output_prefix]) if log_file: cmd.append('-l') if monitoring_file: cmd.append('-m') if packet_mode: cmd.append('-s') # Optional arguments if not loading_list and plugins: loading_list = '/tmp/plugins.load' T2Utils.create_plugin_list(plugins, outfile=loading_list, verbose=verbose) if loading_list: cmd.extend(['-b', loading_list]) if plugin_folder: cmd.extend(['-p', plugin_folder]) if bpf: cmd.append(bpf) subprocess.run(cmd, capture_output=not verbose, check=True, timeout=timeout) @staticmethod def list_config( plugin: str ) -> List[str]: """List the configuration flags available for `plugin`. Parameters ---------- plugin : str The name of a plugin Returns ------- list of str The list of configuration flags available for the plugin. Raises ------ NameError If the plugin could not be found. Notes ----- Equivalent to: `$ t2conf pluginName -I`. Examples -------- >>> T2Utils.list_config('arpDecode') ['MAX_IP'] """ if plugin not in T2Utils.valid_plugin_names(): raise NameError(f'Plugin {plugin} could not be found') cmd = [T2Utils.T2CONF, '-y', plugin, '-I'] config = subprocess.run(cmd, text=True, stdout=subprocess.PIPE).stdout.strip() all_flags = config.split('\n') if config else list() flags = [f.strip() for f in all_flags] if len(flags) == 1 and flags[0] == f'No configuration flags found for {plugin}': flags = list() return flags @staticmethod def get_config( plugin: str, name: str, infile: str = None ) -> Any: """Get the value of a define from a configuration or header file. Parameters ---------- plugin : str The name of a plugin. name : str The name of a configuration flag. infile : str, default: None Specify from which file (absolute path) to get the value from, e.g., `'$T2PLHOME/pluginName/pluginName.cfg'`. Use the special value `'default'` to get the value from `default.config`. If `'source'` or `None`, get the value from the header file. Raises ------ NameError If the plugin could not be found. See Also -------- T2Utils.get_config_from_source : Alias for `T2Utils.get_config(plugin, name, 'source')` T2Utils.get_default : Alias for `T2Utils.get_config(plugin, name, 'default')`. Notes ----- Equivalent to: `$ t2conf pluginName -G name [-g infile]`. Examples -------- >>> T2Utils.get_config('arpDecode', 'MAX_IP') 10 """ if plugin not in T2Utils.valid_plugin_names(): raise NameError(f'Plugin {plugin} could not be found') cmd = [T2Utils.T2CONF, '-y', plugin, '-G', name] if infile and infile != 'source': cmd.extend(['-g', infile]) config = subprocess.run(cmd, text=True, stdout=subprocess.PIPE).stdout val = config.split('=')[1].strip() if config else None if val: if val == 'no': val = 0 elif val == 'yes': val = 1 elif val.lstrip('-').isdigit(): val = int(val) elif val.lstrip('-').replace('.', '', 1).isdigit(): val = float(val) elif val.startswith('"'): val = val.strip('"') elif val.startswith("'"): val = val.strip("'") return val @staticmethod def get_default( plugin: str, name: str ) -> Any: """Get the default value of a define. Parameters ---------- plugin : str The name of a plugin. name : str The name of a configuration flag. Raises ------ NameError If the plugin could not be found. See Also -------- T2Utils.get_config : Get the value of a define from a configuration or header file. T2Utils.get_config_from_source : Get the value of a define from a header file. Notes ----- Equivalent to: `$ t2conf pluginName -G name -g default`. Alias for `T2Utils.get_config(plugin, name, infile='default')`. Examples -------- >>> T2Utils.get_default('arpDecode', 'MAX_IP') 10 """ return T2Utils.get_config(plugin, name, 'default') @staticmethod def get_config_from_source( plugin: str, name: str ) -> Any: """Get the value of a define from the header file, e.g., `pluginName.h`. Parameters ---------- plugin : str The name of a plugin. name : str The name of a configuration flag. Raises ------ NameError If the plugin could not be found. See Also -------- T2Utils.get_config : Get the value of a define from a configuration or header file. T2Utils.get_default : Get the default value of a define. Notes ----- Equivalent to: `$ t2conf pluginName -G name`. Alias for `T2Utils.get_config(plugin, name, infile='source')`. Examples -------- >>> T2Utils.get_config_from_source('arpDecode', 'MAX_IP') 10 """ return T2Utils.get_config(plugin, name, 'source') @staticmethod @dispatch(str, str, object, outfile=str) def set_config( plugin: str, name: str, value: Any, outfile: str = None, verbose: bool = False ): """Set the value of a define in a configuration or header file. Parameters ---------- plugin : str The name of a plugin. name : str The name of the configuration flag to set. value : Any The new value for `name`. Use `'default'` to reset the value to its default. outfile : str, default: None Use `'source'` to set the value in the header file. verbose : bool, default: False - If `True`, print the output (`stdout` and `stderr`) of the command. - If `False`, do not print anything. Raises ------ NameError If the plugin could not be found. subprocess.CalledProcessError If the process exited with a non-zero exit code. Notes ----- Equivalent to: `$ t2conf pluginName -D name=value [-g infile]`. Examples -------- >>> T2Utils.set_config('arpDecode', 'MAX_IP', 25) """ if plugin not in T2Utils.valid_plugin_names(): raise NameError(f'Plugin {plugin} could not be found') current = T2Utils.get_config(plugin, name) # Make sure value is properly quoted if isinstance(current, str): if current[0] == '"' and current[-1] == '"': if value[0] != '"': value = '"' + value if value[-1] != '"': value += '"' cmd = [T2Utils.T2CONF, '-y', plugin, '-D', f'{name}={value}'] if outfile and outfile != 'source': cmd.extend(['-g', outfile]) subprocess.run(cmd, capture_output=not verbose, check=True) @staticmethod @dispatch(str, dict, outfile=str) def set_config( plugin: str, name_val: Dict[str, Any], outfile: str = None, verbose: bool = False ): """Set the value of a define in a configuration or header file. Parameters ---------- plugin : str The name of a plugin. name_val : dict of str, Any Dictionary of key/value to set. outfile : str, default: None Use `'source'` to set the value in the header file. verbose : bool, default: False - If `True`, print the output (`stdout` and `stderr`) of the command. - If `False`, do not print anything. Raises ------ NameError If the plugin could not be found. subprocess.CalledProcessError If one of the processes exited with a non-zero exit code. Notes ----- Equivalent to: `$ t2conf pluginName -D name1=value1 [-D name2=value2 ...] [-g outfile]`. Examples -------- >>> T2Utils.set_config('arpDecode', 'MAX_IP', 25) """ for name, value in name_val.items(): T2Utils.set_config(plugin, name, value, outfile=outfile, verbose=verbose) @staticmethod def set_default( plugin: Union[str, List[str]], name: Union[str, List[str]] = None, outfile: str = None, verbose: bool = False ): """Reset the value of a define to its default. Parameters ---------- plugin : str or list of str `plugin` can be: - a single plugin, e.g., `'pluginName'` - a list of plugins, e.g., `['pluginName1', 'pluginName2']` - `all` to apply the operation to all available plugins name : str or list of str, default: None The name of a configuration flag to reset. If `name` is set, only reset the value for the specified flag(s). outfile : str, default: None Path to a configuration file where the changes will be rest. Use `outfile='source'` to set the value in the header file. verbose : bool, default: False - If `True`, print the output (`stdout` and `stderr`) of the command. - If `False`, do not print anything. Raises ------ NameError If a plugin could not be found. NotImplementedError If trying to reset specific flag(s) with multiple plugins. subprocess.CalledProcessError If the process exited with a non-zero exit code. See Also -------- T2Utils.set_config : Set the value of a define in a configuration or header file. T2Utils.reset_config : Reset all configuration flags from a plugin to their default values. Notes ----- Equivalent to: `$ t2conf pluginName -D name=default [-g outfile]`. Alias for `T2Utils.set_config(plugin, name, 'default', outfile)`. Examples -------- >>> T2Utils.set_default('arpDecode', 'MAX_IP') >>> T2Utils.set_default('basicStats', ['BS_VAR', 'BS_STDDEV']) >>> T2Utils.set_default(['arpDecode', 'basicStats']) """ if name: if isinstance(plugin, list): raise NotImplementedError('Cannot reset specific flag(s) with list of plugins') elif plugin == 'all': raise NotImplementedError("Cannot reset specific flag(s) for 'all' plugins") if isinstance(name, list): for n in name: T2Utils.set_config(plugin, n, 'default', outfile=outfile, verbose=verbose) else: T2Utils.set_config(plugin, name, 'default', outfile=outfile, verbose=verbose) else: cmd = [T2Utils.T2CONF, '-y', '--reset'] accept_outfile = False if plugin == 'all': cmd.append('-a') elif isinstance(plugin, list): cmd.extend(plugin) else: accept_outfile = True cmd.append(plugin) if outfile and outfile != 'source': if not accept_outfile: raise NotImplementedError("Cannot reset configuration in specific file with list of plugins or 'all' plugins") cmd.extend(['-g', outfile]) subprocess.run(cmd, capture_output=not verbose, check=True) @staticmethod def reset_config( plugin: Union[str, List[str]], name: Union[str, List[str]] = None, outfile: str = None, verbose: bool = False ): """Reset all configuration flags from a plugin to their default values. Parameters ---------- plugin : str or list of str `plugin` can be: - a single plugin, e.g., `'pluginName'` - a list of plugins, e.g., `['pluginName1', 'pluginName2']` name : str, default: None The name of a configuration flag to reset. If `name` is set, only reset the value for the specified flag(s). outfile : str, default: None Path to a configuration file where the changes will be rest. Use `outfile='source'` to reset the value in the header file. verbose : bool, default: False - If `True`, print the output (`stdout` and `stderr`) of the command. - If `False`, do not print anything. Raises ------ NameError If a plugin could not be found. NotImplementedError If trying to reset specific flag(s) with multiple plugins. subprocess.CalledProcessError If the process exited with a non-zero exit code. See Also -------- T2Utils.set_default : Set the value of a define in a configuration or header file. Notes ----- Equivalent to: `$ t2conf pluginName --reset [-g outfile]`. Alias for `T2Utils.set_default(plugin, None, outfile)`. Examples -------- >>> T2Utils.reset_config('arpDecode') >>> T2Utils.reset_config('arpDecode', 'MAX_IP') >>> T2Utils.reset_config(['arpDecode', 'txtSink']) >>> T2Utils.reset_config('arpDecode', 'MAX_IP', '/tmp/arpDecode.config') """ T2Utils.set_default(plugin, name, outfile=outfile, verbose=verbose) @staticmethod def generate_config( plugin: str, outfile: str = None, verbose: bool = False ): """Generate a configuration file for `plugin`. Parameters ---------- plugin : str The name of a plugin. outfile : str, default: None The filename where to save the configuration. If `None`, default to `'pluginName.config'` in the plugin home folder. verbose : bool, default: False - If `True`, print the output (`stdout` and `stderr`) of the command. - If `False`, do not print anything. Raises ------ NameError If the plugin could not be found. subprocess.CalledProcessError If the process exited with a non-zero exit code. See Also -------- T2Utils.apply_config : Apply a configuration file to the sources. Notes ----- Equivalent to: `$ t2conf pluginName -g [file]`. Examples -------- >>> T2Utils.generate_config('arpDecode') >>> T2Utils.generate_config('arpDecode', '/tmp/arpDecode.config') """ if plugin not in T2Utils.valid_plugin_names(): raise NameError(f'Plugin {plugin} could not be found') cmd = [T2Utils.T2CONF, '-y', plugin, '-g'] if outfile: cmd.append(outfile) subprocess.run(cmd, capture_output=not verbose, check=True) @staticmethod def apply_config( plugin: str, infile: str = None, verbose: bool = False ): """Apply a configuration file to the sources. Parameters ---------- plugin : str The name of a plugin. infile : str, default: None The name of the configuration file to apply. If `None`, default to `'pluginName.config'` in the plugin home folder. verbose : bool, default: False - If `True`, print the output (`stdout` and `stderr`) of the command. - If `False`, do not print anything. Raises ------ subprocess.CalledProcessError If the process exited with a non-zero exit code. See Also -------- T2Utils.generate_config : Generate configuration files for plugins Notes ----- Equivalent to: `$ t2conf pluginName -C [file|auto]`. Examples -------- >>> T2Utils.apply_config('arpDecode') >>> T2Utils.apply_config('arpDecode', '/tmp/arpDecode.config') """ if plugin not in T2Utils.valid_plugin_names(): raise NameError(f'Plugin {plugin} could not be found') cmd = [T2Utils.T2CONF, '-y', plugin, '-C'] cmd.append(infile if infile else 'auto') subprocess.run(cmd, capture_output=not verbose, check=True) @staticmethod def build( plugin: Union[str, List[str]], plugin_folder: str = None, force_rebuild: bool = False, debug: bool = False, verbose: bool = False ): """Build a plugin or Tranalyzer2. Parameters ---------- plugin : str or list of str `plugin` can be: - a single plugin, e.g., `'pluginName'` - a list of plugins, e.g., `['pluginName1', 'pluginName2']` - a plugin loading list, e.g., `'myPlugins.txt'` - `all` to apply the operation to all available plugins plugin_folder : str, default: None Path to the plugin folder. If `None`, default to `'$HOME/.tranalyzer/plugins'`. force_rebuild : bool, default: False Force the rebuild of the configuration files (`$ t2build -r ...`). debug : bool, default: False Build the plugin(s) in debug mode. verbose : bool, default: False - If `True`, print the output (`stdout` and `stderr`) of the command. - If `False`, do not print anything. Raises ------ subprocess.CalledProcessError If the process exited with a non-zero exit code. See Also -------- T2Utils.clean : Remove files generated by `T2Utils.build()`. Notes ----- Equivalent to: - `$ t2build pluginName` - `$ t2build pluginName1 pluginName2` - `$ t2build -b myPlugins.txt` - `$ t2build -b -a` Examples -------- >>> T2Utils.build('arpDecode') >>> T2Utils.build(['arpDecode', 'txtSink']) >>> T2Utils.build('/tmp/myPlugins.load') >>> T2Utils.build('all') """ cmd = [T2Utils.T2BUILD, '-y'] if debug: cmd.append('-d') if force_rebuild: cmd.append('-r') if plugin_folder: cmd.extend(['-p', plugin_folder]) if plugin == 'tranalyzer2': cmd.extend(['-i', plugin]) elif plugin == 'all': cmd.append('-a') elif isinstance(plugin, list): if 'tranalyzer2' in plugin: cmd.extend(['-i', 'tranalyzer2']) plugin.remove('tranalyzer2') cmd.extend(plugin) elif isfile(plugin): cmd.extend(['-b', plugin]) else: cmd.append(plugin) subprocess.run(cmd, capture_output=not verbose, check=True) @staticmethod def clean( plugin: Union[str, List[str]], verbose: bool = False ): """Remove files generated by `T2Utils.build()`. Parameters ---------- plugin : str or list of str `plugin` can be: - a single plugin, e.g., `'pluginName'` - a list of plugins, e.g., `['pluginName1', 'pluginName2']` - a plugin loading list, e.g., `'myPlugins.txt'` - `all` to apply the operation to all available plugins verbose : bool, default: False - If `True`, print the output (`stdout` and `stderr`) of the command. - If `False`, do not print anything. Raises ------ subprocess.CalledProcessError If the process exited with a non-zero exit code. See Also -------- T2Utils.build : Build a plugin or Tranalyzer2. Notes ----- Equivalent to: - `$ t2build -c pluginName` - `$ t2build -c pluginName1 pluginName2` - `$ t2build -c -b myPlugins.txt` - `$ t2build -c -a` Examples -------- >>> T2Utils.clean('arpDecode') >>> T2Utils.clean(['arpDecode', 'txtSink']) >>> T2Utils.clean('/tmp/myPlugins.load') >>> T2Utils.clean('all') """ cmd = [T2Utils.T2BUILD, '-y', '-c'] if plugin == 'all': cmd.append('-a') elif isinstance(plugin, list): cmd.extend(plugin) elif isfile(plugin): cmd.extend(['-b', plugin]) else: cmd.append(plugin) subprocess.run(cmd, capture_output=not verbose, check=True) @staticmethod def unload( plugin: Union[str, List[str]], plugin_folder: str = None, verbose: bool = False ): """Remove (unload) a plugin from the plugin folder. Parameters ---------- plugin : str or list of str `plugin` can be: - a single plugin, e.g., `'pluginName'` - a list of plugins, e.g., `['pluginName1', 'pluginName2']` - a plugin loading list, e.g., `'myPlugins.txt'` - `all` to apply the operation to all available plugins plugin_folder : str, default: None The plugin folder from where the plugin must be unloaded. If `None`, use the default plugin folder (`'$HOME/.tranalyzer/plugins'`). verbose : bool, default: False - If `True`, print the output (`stdout` and `stderr`) of the command. - If `False`, do not print anything. Raises ------ subprocess.CalledProcessError If the process exited with a non-zero exit code. Notes ----- Equivalent to: - `$ t2build -u pluginName` - `$ t2build -u pluginName1 pluginName2` - `$ t2build -u -b myPlugins.txt` - `$ t2build -u -a` Examples -------- >>> T2Utils.unload('arpDecode') >>> T2Utils.unload(['arpDecode', 'txtSink']) >>> T2Utils.unload('/tmp/myPlugins.load') >>> T2Utils.unload('all') """ cmd = [T2Utils.T2BUILD, '-y', '-u'] if plugin_folder: cmd.extend(['-p', plugin_folder]) if plugin == 'all': cmd.append('-a') elif isinstance(plugin, list): cmd.extend(plugin) elif isfile(plugin): cmd.extend(['-b', plugin]) else: cmd.append(plugin) subprocess.run(cmd, capture_output=not verbose, check=True) @staticmethod def plugin_description( plugin: str ) -> str: """Return a short description of a plugin. Parameters ---------- plugin : str The name of a plugin. Returns ------- str The description of the plugin. Raises ------ NameError If the plugin could not be found. Notes ----- Equivalent to: `$ t2plugin -l | awk '$1 == "plugin" { sub("^" $1 FS "+" $2 FS "+", ""); print }'`. Examples -------- >>> T2Utils.plugin_description('arpDecode') 'Address Resolution Protocol (ARP)' """ if plugin not in T2Utils.valid_plugin_names(): raise NameError(f'Plugin {plugin} could not be found') cmd = [T2Utils.T2PLUGIN, '-l'] out = subprocess.run(cmd, text=True, stdout=subprocess.PIPE).stdout for line in out.splitlines(): cols = line.split() if len(cols) > 2 and cols[0] == plugin: return ' '.join(cols[2:]) @staticmethod def plugin_number( plugin: str ) -> str: """Return the plugin number of a plugin. Parameters ---------- plugin : str The name of a plugin. Returns ------- str The plugin number. Raises ------ NameError If the plugin could not be found. Notes ----- Equivalent to: `$ t2plugin -l | awk '$1 == "pluginName" { print $2; exit }'` Examples -------- >>> T2Utils.plugin_number('arpDecode') '179' """ if plugin not in T2Utils.valid_plugin_names(): raise NameError(f'Plugin {plugin} could not be found') cmd = [T2Utils.T2PLUGIN, '-l'] out = subprocess.run(cmd, text=True, stdout=subprocess.PIPE).stdout for line in out.splitlines(): cols = line.split() if len(cols) > 2 and cols[0] == plugin: return cols[1] @staticmethod def plugins( category: Union[str, List[str]] = None ) -> List[str]: """Alphabetically List all the available plugins. Parameters ---------- category : str or list of str, default: None `category` can be used to specify the types of plugins to list: - `'g'` : global - `'b'` : basic - `'l2'`: layer 2 - `'l4'`: layer 3/4 - `'l7'`: layer 7 - `'a'` : application - `'m'` : math - `'c'` : classifier - `'o'` : output - `None`: all Returns ------- list of str List of all available plugins. Examples -------- >>> T2Utils.plugins() >>> T2Utils.plugins('g') ['protoStats'] >>> T2Utils.plugins(['g', 'l2']) ['arpDecode', 'cdpDecode', 'lldpDecode', 'protoStats', 'stpDecode', 'vtpDecode'] """ cmd = [T2Utils.T2PLUGIN, '-H', '-N'] if isinstance(category, list): for c in category: cmd.append(f'-l={c}') elif category: cmd.append(f'-l={category}') elif T2Utils._all_plugins: return T2Utils._all_plugins else: cmd.append('-l') out = subprocess.run(cmd, text=True, stdout=subprocess.PIPE).stdout sorted_list = sorted(out.splitlines()) if not category: T2Utils._all_plugins = sorted_list return sorted_list @staticmethod def list_plugins( infile: str = None ) -> List[str]: """List the active plugins in `infile`. Parameters ---------- infile : str, default: None Name of a plugin loading list. If `None`, default to `'plugins.load'` in the plugin folder. Returns ------- list of str List of active plugins in `infile`. Notes ----- Equivalent to: `$ t2conf -S [file]`. Examples -------- >>> T2Utils.list_plugins() ['arpDecode', 'txtSink'] >>> T2Utils.list_plugins('/tmp/myPlugins.load') ['arpDecode', 'txtSink'] """ cmd = [T2Utils.T2CONF, '-y', '-S'] if infile: cmd.append(infile) out = subprocess.run(cmd, text=True, stdout=subprocess.PIPE).stdout return sorted(out.splitlines()) @staticmethod def create_pcap_list( pcaps: List[str], outfile: str = None ): """Create a loading list of pcaps. Parameters ---------- pcaps : list of str List of pcap files. outfile : str, default: None Name of the file to save. If `None`, default to `/tmp/pcap_list.txt'`. Notes ----- Equivalent to: `$ t2caplist pcaps > outfile`. Examples -------- >>> T2Utils.create_pcap_list(['file1.pcap', 'file2.pcap']) >>> T2Utils.create_pcap_list(['file1.pcap', 'file2.pcap'], '/tmp/myPcaps.txt') """ if not outfile: outfile = '/tmp/pcap_list.txt' with open(outfile, 'w') as f: for pcap in pcaps: f.write(pcap + '\n') @staticmethod def create_plugin_list( plugins: List[str], outfile: str = None, verbose: bool = False ): """Create a loading list of plugins. Parameters ---------- plugins : list of str List of plugin names. outfile : str, default: None Name of the file to save. If `None`, default to `'plugins.load'` in the plugin folder. verbose : bool, default: False - If `True`, print the output (`stdout` and `stderr`) of the command. - If `False`, do not print anything. Raises ------ subprocess.CalledProcessError If the process exited with a non-zero exit code (if `plugins is not None`). Notes ----- Equivalent to: `$ t2conf -L plugin1 plugin2 ...`. Examples -------- >>> T2Utils.create_plugin_list(['arpDecode', 'txtSink']) >>> T2Utils.create_plugin_list(['arpDecode', 'txtSink'], '/tmp/myPlugins.txt') """ if outfile: # simpler format, only active plugins with open(outfile, 'w') as f: for plugin in plugins: f.write(plugin + '\n') else: cmd = [T2Utils.T2CONF, '-y', '-L'] cmd.extend(plugins) subprocess.run(cmd, capture_output=not verbose, check=True) @staticmethod def tawk( program: str = None, filename: str = None, options: List[str] = None ) -> str: """Call `tawk` with the `program`, `options` and `filename` specified. Parameters ---------- program : str, default: None A `tawk` program, e.g., `"$dstPort == 80 { print tuple5() }"`. filename : str, default: None Name of the file to analyze. options : list of str, default: None List of options to pass to `tawk`. Returns ------- str The result of the `tawk` command. Raises ------ RuntimeError If `filename` was specified, but the file does not exist. subprocess.CalledProcessError If the process exited with a non-zero exit code. Notes ----- Equivalent to: `$ tawk [options] 'program' filename`. Examples -------- >>> result = T2Utils.tawk(None, None, ['-V', 'flowStat=0x1234']) >>> print(result) >>> T2Utils.tawk('host("1.2.3.4") { aggr(proto()) }', 'file.txt') """ if filename and not isfile(filename): raise RuntimeError(f"Input file '{filename}' does not exist") cmd = [T2Utils.TAWK] if program: cmd.append(program) if options: cmd.extend(options) if filename: cmd.append(filename) result = subprocess.run(cmd, capture_output=True, check=True) result = result.stdout.decode('utf8').strip() return result @staticmethod def follow_stream( filename: str, flow: int, output_format: Union[int, str] = 2, direction: str = None, payload_format: Union[int, str] = 4, reassembly: bool = True, colors: bool = True ) -> Union[str, List[str], List[bytearray], List[Dict[str, Any]]]: """Call `tawk` `follow_stream()` function with the specified options. Return the reassembled payload of flow with index `flow` and direction `direction`. Parameters ---------- filename : str Name of the file to analyze. flow : int Index (`flowInd`) of the flow to analyze. output_format : int or str, default: 2 The format to use for the output: - 0 or `'payload_only'` - 1 or `'info'`: Prefix each payload with packet/flow info - 2 or `'json'` - 3 or `'reconstruct'` direction : str, default: None The direction to follow (`'A'` or `'B'`). Use `None` to follow both directions. payload_format : int or str, default: 4 The format to use for the payload: - 0 or 'ascii' - 1 or 'hexdump' - 2, 'raw' or 'binary' - 3 or 'base64' - 4 or 'bytearray' reassembly : bool, default: True Analyze TCP sequence numbers, reassemble and reorder TCP segments. colors : bool, default: True Output colors. Returns ------- list of bytearray If `output_format` is 0 or 2 and `payload_format` is 4 or `'bytearray'`. list of dict If `output_format` is 2. list of str If `output_format` is 0 and `payload_format` is not 4 or `'bytearray'`. str If none of the above applied. Raises ------ RuntimeError - If `filename` could not be found. - If `output_format` or `payload_format` was out of bound. - If `direction` was not `'A'`, `'B'` or `None`. - If `output_format` was 3 or `'reconstruct'` and `direction` was not `'A'` or `'B'`. - If `payload_format` was 4 or `'bytearray'` and `output_format` was 1 or `'info'`. subprocess.CalledProcessError If the process exited with a non-zero exit code. See Also -------- T2Utils.tawk : Call `tawk` with the `program`, `options` and `filename` specified. Notes ----- Equivalent to: `$ tawk 'follow_stream(flow, output_format, direction, payload_format, not reassembly, not colors)' filename`. Examples -------- >>> result = T2Utils.follow_stream(1, payload_format='hexdump', direction='B') >>> print(result) """ if not filename: raise RuntimeError("Input file != None is required") if direction: if direction not in ('A', 'B'): raise RuntimeError(f"Invalid value for 'direction' parameter: '{direction}'") direction = f'"{direction}"' if output_format in (0, 'payload_only'): output_format = 0 elif output_format in (1, 'info'): output_format = 1 elif output_format in (2, 'json'): output_format = 2 elif output_format in (3, 'reconstruct'): output_format = 3 else: raise RuntimeError(f"Invalid value for 'output_format' parameter: '{output_format}'") if output_format == 3 and direction is None: raise RuntimeError("Payload reconstruction (output format 3 or 'reconstruct') requires a direction ('A' or 'B').") if payload_format in (0, 'ascii'): payload = 0 elif payload_format in (1, 'hexdump'): payload = 1 elif payload_format in (2, 'raw', 'binary'): payload = 2 elif payload_format in (4, 'bytearray'): if output_format == 1: raise RuntimeError("'payload_format' 4 or 'bytearray' cannot be specified with 'output_format' 1 or 'info'") payload = 2 colors = False elif payload_format in (3, 'base64'): payload = 3 else: raise RuntimeError(f"Invalid value for 'payload_format' parameter: '{payload_format}'") prog = ( 'follow_stream(' f'{flow},' f'{output_format},' f'{direction if direction else 0},' f'{payload},' f'{0 if reassembly else 1},' f'{0 if colors else 1}' ')' ) result = T2Utils.tawk(prog, filename) if output_format == 0: result = result.split('\n') if result else [] elif output_format == 2: result = result.split('\n') if result else [] result = [json.loads(s) if s else {} for s in result] if payload_format in (4, 'bytearray'): if output_format == 3: result = bytearray.fromhex(result) elif output_format == 0: result = [bytearray.fromhex(s) for s in result] elif output_format == 2: for packet in result: packet['payload'] = bytearray.fromhex(packet['payload']) return result @staticmethod def load_plugins( plugin: Union[str, List[str]] = None ): """Load plugins as `T2Plugin` objects. The plugins will be available as, e.g., ``T2Utils.pluginName``. Parameters ---------- plugin : str or list of str, default: None Name of the plugin(s) to load. `plugin` can be: - a plugin name - a list of plugin names - `None` or 'all' (slow) Raises ------ NameError If a plugin could not be found. Examples -------- >>> T2Utils.load_plugins('arpDecode') >>> T2Utils.load_plugins(['arpDecode', 'txtSink']) >>> T2Utils.load_plugins() >>> T2Utils.arpDecode <t2py.T2Plugin.T2Plugin object at 0x16055a370> """ from .T2Plugin import T2Plugin all_plugins = T2Utils.valid_plugin_names() if isinstance(plugin, list): plugins = plugin elif not plugin or plugin == 'all': plugins = all_plugins else: plugins = [plugin] for p in plugins: if p not in all_plugins: raise NameError(f'Plugin {p} could not be found') else: setattr(T2Utils, p, T2Plugin(p)) @staticmethod def _tsv_to_list( filename: str, delimiter: str = '\t' ) -> List[Dict[str, Any]]: """Convert a tsv/csv file to a python list of JSON objects. Parameters ---------- filename : str The name of the file to load. delimiter : str, default: '\\t' The delimiter used to separate columns in the input file. Returns ------- list of dict A list of dictionary, whose entries are the column names and their associated values. """ import csv data = [] with open(filename) as f: reader = csv.DictReader(f, delimiter=delimiter) reader.fieldnames[0] = reader.fieldnames[0].lstrip('%').strip() for flow in reader: data.append(flow) return data @staticmethod def to_json_array( infile: str, delimiter: str = '\t' ) -> List[Dict[str, Any]]: """Convert a flow or packet file to an array of JSON objects. Parameters ---------- infile : str Path to a flow or packet file. delimiter : str, default: '\\t' Field delimiter used in the input file. Returns ------- list of dict The list of rows contained in `infile`. Each row is represented as a dict with the column names as key. Raises ------ json.decoder.JSONDecodeError If `infile` was a JSON file and the decoding process failed. Examples -------- >>> flows = T2Utils.to_json_array('/tmp/file_flows.txt') >>> flows [{'dir': 'A', 'flowInd': 1, ...}, {'dir': 'B', 'flowInd': 1, ...}, ...] """ data = [] if not infile.endswith('.json'): data = T2Utils._tsv_to_list(infile, delimiter) else: with open(infile) as f: for flow in f: if flow in ('[\n', ']\n'): continue data.append(json.loads(flow.strip(','))) return data @staticmethod def to_pandas( infile: str, delimiter: str = None ) -> "pandas.core.frame.DataFrame": """Convert a flow or packet file to pandas `DataFrame`. Parameters ---------- infile : str Path to a flow or packet file. delimiter : str, default: None Field delimiter used in the input file. Returns ------- pd.DataFrame DataFrame holding the tabular data stored in `infile`. Examples -------- >>> df = T2Utils.to_pandas('/tmp/file_flows.txt') >>> type(df) <class 'pandas.core.frame.DataFrame'> """ import pandas as pd if infile.endswith('.json'): return pd.DataFrame(T2Utils.to_json_array(infile)) else: return pd.read_table(infile, delimiter=delimiter) @staticmethod def to_pdf( pcap: str = None, flow_file: str = None, prefix: str = None, config: bool = True, reset_config: bool = True, open_pdf: bool = True, verbose: bool = False ): """Generate a PDF/LaTeX report from a PCAP or Tranalyzer flow file." Parameters ---------- pcap : str, default: None PCAP file to analyze. file : str, default: None Flow file to analyze. prefix : str, default: None Append 'prefix' to any output file produced. config : bool, default: True If `True`, configure and build Tranalyzer2 and the plugins as required. reset_config : bool, default: True If `True`, reset tranalyzer2 and the plugins configuration at the end. open_pdf : bool, default: True If `True`, automatically open the generated PDF. verbose : bool, default: False - If `True`, print the output (`stdout` and `stderr`) of the command. - If `False`, do not print anything. Raises ------ RuntimeError - If one of the required parameter (pcap or flow_file) was not specified. - If multiple inputs (pcap and flow_file) were specified. subprocess.CalledProcessError If the process exited with a non-zero exit code. Notes ----- Equivalent to: `$ t2fm [-b] [-A] [-r pcap|-F file]`. Examples -------- >>> T2Utils.to_pdf(pcap='file.pcap') >>> T2Utils.to_pdf(flow_file='file_flows.txt') """ if not pcap and not flow_file: raise RuntimeError('One of (pcap, flow_file) must be specified.') elif pcap and flow_file: raise RuntimeError('Only one of (pcap, flow_file) must be specified.') cmd = [T2Utils.T2FM, '-y'] # Input if pcap: cmd.extend(['-r', pcap]) elif flow_file: cmd.extend(['-F', flow_file]) # Output if prefix: cmd.extend(['-w', prefix]) # Options if config: cmd.append('-b') if reset_config: cmd.append('--reset') if open_pdf: cmd.append('-A') subprocess.run(cmd, capture_output=not verbose, check=True) @staticmethod def network_interfaces() -> List[str]: """List the names of all available network interfaces. Returns ------- list of str The list of available network interfaces. Examples -------- >>> T2Utils.network_interfaces() ['lo0', 'eth0'] """ import socket return [iface[1] for iface in socket.if_nameindex()] @staticmethod def valid_plugin_names() -> List[str]: """Return a list of valid plugin names. Returns ------- list of str The list of valid plugin names. Examples -------- >>> T2Utils.valid_plugin_names() ['arpDecode', ..., 'tranalyzer2', ...] """ valid_plugins = ['tranalyzer2'] valid_plugins.extend(T2Utils.plugins()) return sorted(valid_plugins)
Class variables
var T2BUILD : str
-
The path to the
autogen.sh
script, i.e.,$T2HOME/autogen.sh
. var T2CONF : str
-
The path to the
t2conf
script, i.e.,$T2HOME/scripts/t2conf/t2conf
. var T2FM : str
-
The path to the
t2fm
script, i.e.,$T2HOME/scripts/t2fm/t2fm
. var T2HOME : str
-
The path to Tranalyzer home folder.
var T2PLHOME : str
-
The path to Tranalyzer plugins folder, i.e.,
$T2HOME/plugins
. var T2PLUGIN : str
-
The path to the
t2plugin
script, i.e.,$T2HOME/scripts/t2plugin
. var TAWK : str
-
The path to the
tawk
script, i.e.,$T2HOME/scripts/tawk/tawk
. var set_config
Static methods
def apply_config(plugin: str, infile: str = None, verbose: bool = False)
-
Apply a configuration file to the sources.
Parameters
plugin
:str
- The name of a plugin.
infile
:str
, default: None
- The name of the configuration file to apply.
If
None
, default to'pluginName.config'
in the plugin home folder. verbose
:bool
, default: False
-
- If
True
, print the output (stdout
andstderr
) of the command. - If
False
, do not print anything.
- If
Raises
subprocess.CalledProcessError
- If the process exited with a non-zero exit code.
See Also
T2Utils.generate_config()
- Generate configuration files for plugins
Notes
Equivalent to:
$ t2conf pluginName -C [file|auto]
.Examples
>>> T2Utils.apply_config('arpDecode') >>> T2Utils.apply_config('arpDecode', '/tmp/arpDecode.config')
Expand source code
@staticmethod def apply_config( plugin: str, infile: str = None, verbose: bool = False ): """Apply a configuration file to the sources. Parameters ---------- plugin : str The name of a plugin. infile : str, default: None The name of the configuration file to apply. If `None`, default to `'pluginName.config'` in the plugin home folder. verbose : bool, default: False - If `True`, print the output (`stdout` and `stderr`) of the command. - If `False`, do not print anything. Raises ------ subprocess.CalledProcessError If the process exited with a non-zero exit code. See Also -------- T2Utils.generate_config : Generate configuration files for plugins Notes ----- Equivalent to: `$ t2conf pluginName -C [file|auto]`. Examples -------- >>> T2Utils.apply_config('arpDecode') >>> T2Utils.apply_config('arpDecode', '/tmp/arpDecode.config') """ if plugin not in T2Utils.valid_plugin_names(): raise NameError(f'Plugin {plugin} could not be found') cmd = [T2Utils.T2CONF, '-y', plugin, '-C'] cmd.append(infile if infile else 'auto') subprocess.run(cmd, capture_output=not verbose, check=True)
def build(plugin: Union[str, List[str]], plugin_folder: str = None, force_rebuild: bool = False, debug: bool = False, verbose: bool = False)
-
Build a plugin or Tranalyzer2.
Parameters
plugin
:str
orlist
ofstr
-
plugin
can be:- a single plugin, e.g.,
'pluginName'
- a list of plugins, e.g.,
['pluginName1', 'pluginName2']
- a plugin loading list, e.g.,
'myPlugins.txt'
all
to apply the operation to all available plugins
- a single plugin, e.g.,
plugin_folder
:str
, default: None
- Path to the plugin folder.
If
None
, default to'$HOME/.tranalyzer/plugins'
. force_rebuild
:bool
, default: False
- Force the rebuild of the configuration files (
$ t2build -r ...
). debug
:bool
, default: False
- Build the plugin(s) in debug mode.
verbose
:bool
, default: False
-
- If
True
, print the output (stdout
andstderr
) of the command. - If
False
, do not print anything.
- If
Raises
subprocess.CalledProcessError
- If the process exited with a non-zero exit code.
See Also
T2Utils.clean()
- Remove files generated by
T2Utils.build()
.
Notes
Equivalent to:
$ t2build pluginName
$ t2build pluginName1 pluginName2
$ t2build -b myPlugins.txt
$ t2build -b -a
Examples
>>> T2Utils.build('arpDecode') >>> T2Utils.build(['arpDecode', 'txtSink']) >>> T2Utils.build('/tmp/myPlugins.load') >>> T2Utils.build('all')
Expand source code
@staticmethod def build( plugin: Union[str, List[str]], plugin_folder: str = None, force_rebuild: bool = False, debug: bool = False, verbose: bool = False ): """Build a plugin or Tranalyzer2. Parameters ---------- plugin : str or list of str `plugin` can be: - a single plugin, e.g., `'pluginName'` - a list of plugins, e.g., `['pluginName1', 'pluginName2']` - a plugin loading list, e.g., `'myPlugins.txt'` - `all` to apply the operation to all available plugins plugin_folder : str, default: None Path to the plugin folder. If `None`, default to `'$HOME/.tranalyzer/plugins'`. force_rebuild : bool, default: False Force the rebuild of the configuration files (`$ t2build -r ...`). debug : bool, default: False Build the plugin(s) in debug mode. verbose : bool, default: False - If `True`, print the output (`stdout` and `stderr`) of the command. - If `False`, do not print anything. Raises ------ subprocess.CalledProcessError If the process exited with a non-zero exit code. See Also -------- T2Utils.clean : Remove files generated by `T2Utils.build()`. Notes ----- Equivalent to: - `$ t2build pluginName` - `$ t2build pluginName1 pluginName2` - `$ t2build -b myPlugins.txt` - `$ t2build -b -a` Examples -------- >>> T2Utils.build('arpDecode') >>> T2Utils.build(['arpDecode', 'txtSink']) >>> T2Utils.build('/tmp/myPlugins.load') >>> T2Utils.build('all') """ cmd = [T2Utils.T2BUILD, '-y'] if debug: cmd.append('-d') if force_rebuild: cmd.append('-r') if plugin_folder: cmd.extend(['-p', plugin_folder]) if plugin == 'tranalyzer2': cmd.extend(['-i', plugin]) elif plugin == 'all': cmd.append('-a') elif isinstance(plugin, list): if 'tranalyzer2' in plugin: cmd.extend(['-i', 'tranalyzer2']) plugin.remove('tranalyzer2') cmd.extend(plugin) elif isfile(plugin): cmd.extend(['-b', plugin]) else: cmd.append(plugin) subprocess.run(cmd, capture_output=not verbose, check=True)
def clean(plugin: Union[str, List[str]], verbose: bool = False)
-
Remove files generated by
T2Utils.build()
.Parameters
plugin
:str
orlist
ofstr
-
plugin
can be:- a single plugin, e.g.,
'pluginName'
- a list of plugins, e.g.,
['pluginName1', 'pluginName2']
- a plugin loading list, e.g.,
'myPlugins.txt'
all
to apply the operation to all available plugins
- a single plugin, e.g.,
verbose
:bool
, default: False
-
- If
True
, print the output (stdout
andstderr
) of the command. - If
False
, do not print anything.
- If
Raises
subprocess.CalledProcessError
- If the process exited with a non-zero exit code.
See Also
T2Utils.build()
- Build a plugin or Tranalyzer2.
Notes
Equivalent to:
$ t2build -c pluginName
$ t2build -c pluginName1 pluginName2
$ t2build -c -b myPlugins.txt
$ t2build -c -a
Examples
>>> T2Utils.clean('arpDecode') >>> T2Utils.clean(['arpDecode', 'txtSink']) >>> T2Utils.clean('/tmp/myPlugins.load') >>> T2Utils.clean('all')
Expand source code
@staticmethod def clean( plugin: Union[str, List[str]], verbose: bool = False ): """Remove files generated by `T2Utils.build()`. Parameters ---------- plugin : str or list of str `plugin` can be: - a single plugin, e.g., `'pluginName'` - a list of plugins, e.g., `['pluginName1', 'pluginName2']` - a plugin loading list, e.g., `'myPlugins.txt'` - `all` to apply the operation to all available plugins verbose : bool, default: False - If `True`, print the output (`stdout` and `stderr`) of the command. - If `False`, do not print anything. Raises ------ subprocess.CalledProcessError If the process exited with a non-zero exit code. See Also -------- T2Utils.build : Build a plugin or Tranalyzer2. Notes ----- Equivalent to: - `$ t2build -c pluginName` - `$ t2build -c pluginName1 pluginName2` - `$ t2build -c -b myPlugins.txt` - `$ t2build -c -a` Examples -------- >>> T2Utils.clean('arpDecode') >>> T2Utils.clean(['arpDecode', 'txtSink']) >>> T2Utils.clean('/tmp/myPlugins.load') >>> T2Utils.clean('all') """ cmd = [T2Utils.T2BUILD, '-y', '-c'] if plugin == 'all': cmd.append('-a') elif isinstance(plugin, list): cmd.extend(plugin) elif isfile(plugin): cmd.extend(['-b', plugin]) else: cmd.append(plugin) subprocess.run(cmd, capture_output=not verbose, check=True)
def create_pcap_list(pcaps: List[str], outfile: str = None)
-
Create a loading list of pcaps.
Parameters
pcaps
:list
ofstr
- List of pcap files.
outfile
:str
, default: None
- Name of the file to save.
If
None
, default to/tmp/pcap_list.txt'
.
Notes
Equivalent to:
$ t2caplist pcaps > outfile
.Examples
>>> T2Utils.create_pcap_list(['file1.pcap', 'file2.pcap']) >>> T2Utils.create_pcap_list(['file1.pcap', 'file2.pcap'], '/tmp/myPcaps.txt')
Expand source code
@staticmethod def create_pcap_list( pcaps: List[str], outfile: str = None ): """Create a loading list of pcaps. Parameters ---------- pcaps : list of str List of pcap files. outfile : str, default: None Name of the file to save. If `None`, default to `/tmp/pcap_list.txt'`. Notes ----- Equivalent to: `$ t2caplist pcaps > outfile`. Examples -------- >>> T2Utils.create_pcap_list(['file1.pcap', 'file2.pcap']) >>> T2Utils.create_pcap_list(['file1.pcap', 'file2.pcap'], '/tmp/myPcaps.txt') """ if not outfile: outfile = '/tmp/pcap_list.txt' with open(outfile, 'w') as f: for pcap in pcaps: f.write(pcap + '\n')
def create_plugin_list(plugins: List[str], outfile: str = None, verbose: bool = False)
-
Create a loading list of plugins.
Parameters
plugins
:list
ofstr
- List of plugin names.
outfile
:str
, default: None
- Name of the file to save.
If
None
, default to'plugins.load'
in the plugin folder. verbose
:bool
, default: False
-
- If
True
, print the output (stdout
andstderr
) of the command. - If
False
, do not print anything.
- If
Raises
subprocess.CalledProcessError
- If the process exited with a non-zero exit code (if
plugins is not None
).
Notes
Equivalent to:
$ t2conf -L plugin1 plugin2 ...
.Examples
>>> T2Utils.create_plugin_list(['arpDecode', 'txtSink']) >>> T2Utils.create_plugin_list(['arpDecode', 'txtSink'], '/tmp/myPlugins.txt')
Expand source code
@staticmethod def create_plugin_list( plugins: List[str], outfile: str = None, verbose: bool = False ): """Create a loading list of plugins. Parameters ---------- plugins : list of str List of plugin names. outfile : str, default: None Name of the file to save. If `None`, default to `'plugins.load'` in the plugin folder. verbose : bool, default: False - If `True`, print the output (`stdout` and `stderr`) of the command. - If `False`, do not print anything. Raises ------ subprocess.CalledProcessError If the process exited with a non-zero exit code (if `plugins is not None`). Notes ----- Equivalent to: `$ t2conf -L plugin1 plugin2 ...`. Examples -------- >>> T2Utils.create_plugin_list(['arpDecode', 'txtSink']) >>> T2Utils.create_plugin_list(['arpDecode', 'txtSink'], '/tmp/myPlugins.txt') """ if outfile: # simpler format, only active plugins with open(outfile, 'w') as f: for plugin in plugins: f.write(plugin + '\n') else: cmd = [T2Utils.T2CONF, '-y', '-L'] cmd.extend(plugins) subprocess.run(cmd, capture_output=not verbose, check=True)
def follow_stream(filename: str, flow: int, output_format: Union[int, str] = 2, direction: str = None, payload_format: Union[int, str] = 4, reassembly: bool = True, colors: bool = True) ‑> Union[str, List[str], List[bytearray], List[Dict[str, Any]]]
-
Call
tawk
follow_stream()
function with the specified options.Return the reassembled payload of flow with index
flow
and directiondirection
.Parameters
filename
:str
- Name of the file to analyze.
flow
:int
- Index (
flowInd
) of the flow to analyze. output_format
:int
orstr
, default: 2
-
The format to use for the output:
- 0 or
'payload_only'
- 1 or
'info'
: Prefix each payload with packet/flow info - 2 or
'json'
- 3 or
'reconstruct'
- 0 or
direction
:str
, default: None
- The direction to follow (
'A'
or'B'
). UseNone
to follow both directions. payload_format
:int
orstr
, default: 4
-
The format to use for the payload:
- 0 or 'ascii'
- 1 or 'hexdump'
- 2, 'raw' or 'binary'
- 3 or 'base64'
- 4 or 'bytearray'
reassembly
:bool
, default: True
- Analyze TCP sequence numbers, reassemble and reorder TCP segments.
colors
:bool
, default: True
- Output colors.
Returns
list
ofbytearray
- If
output_format
is 0 or 2 andpayload_format
is 4 or'bytearray'
. list
ofdict
- If
output_format
is 2. list
ofstr
- If
output_format
is 0 andpayload_format
is not 4 or'bytearray'
. str
- If none of the above applied.
Raises
RuntimeError
-
- If
filename
could not be found. - If
output_format
orpayload_format
was out of bound. - If
direction
was not'A'
,'B'
orNone
. - If
output_format
was 3 or'reconstruct'
anddirection
was not'A'
or'B'
. - If
payload_format
was 4 or'bytearray'
andoutput_format
was 1 or'info'
.
- If
subprocess.CalledProcessError
- If the process exited with a non-zero exit code.
See Also
T2Utils.tawk()
- Call
tawk
with theprogram
,options
andfilename
specified.
Notes
Equivalent to:
$ tawk 'follow_stream(flow, output_format, direction, payload_format, not reassembly, not colors)' filename
.Examples
>>> result = T2Utils.follow_stream(1, payload_format='hexdump', direction='B') >>> print(result)
Expand source code
@staticmethod def follow_stream( filename: str, flow: int, output_format: Union[int, str] = 2, direction: str = None, payload_format: Union[int, str] = 4, reassembly: bool = True, colors: bool = True ) -> Union[str, List[str], List[bytearray], List[Dict[str, Any]]]: """Call `tawk` `follow_stream()` function with the specified options. Return the reassembled payload of flow with index `flow` and direction `direction`. Parameters ---------- filename : str Name of the file to analyze. flow : int Index (`flowInd`) of the flow to analyze. output_format : int or str, default: 2 The format to use for the output: - 0 or `'payload_only'` - 1 or `'info'`: Prefix each payload with packet/flow info - 2 or `'json'` - 3 or `'reconstruct'` direction : str, default: None The direction to follow (`'A'` or `'B'`). Use `None` to follow both directions. payload_format : int or str, default: 4 The format to use for the payload: - 0 or 'ascii' - 1 or 'hexdump' - 2, 'raw' or 'binary' - 3 or 'base64' - 4 or 'bytearray' reassembly : bool, default: True Analyze TCP sequence numbers, reassemble and reorder TCP segments. colors : bool, default: True Output colors. Returns ------- list of bytearray If `output_format` is 0 or 2 and `payload_format` is 4 or `'bytearray'`. list of dict If `output_format` is 2. list of str If `output_format` is 0 and `payload_format` is not 4 or `'bytearray'`. str If none of the above applied. Raises ------ RuntimeError - If `filename` could not be found. - If `output_format` or `payload_format` was out of bound. - If `direction` was not `'A'`, `'B'` or `None`. - If `output_format` was 3 or `'reconstruct'` and `direction` was not `'A'` or `'B'`. - If `payload_format` was 4 or `'bytearray'` and `output_format` was 1 or `'info'`. subprocess.CalledProcessError If the process exited with a non-zero exit code. See Also -------- T2Utils.tawk : Call `tawk` with the `program`, `options` and `filename` specified. Notes ----- Equivalent to: `$ tawk 'follow_stream(flow, output_format, direction, payload_format, not reassembly, not colors)' filename`. Examples -------- >>> result = T2Utils.follow_stream(1, payload_format='hexdump', direction='B') >>> print(result) """ if not filename: raise RuntimeError("Input file != None is required") if direction: if direction not in ('A', 'B'): raise RuntimeError(f"Invalid value for 'direction' parameter: '{direction}'") direction = f'"{direction}"' if output_format in (0, 'payload_only'): output_format = 0 elif output_format in (1, 'info'): output_format = 1 elif output_format in (2, 'json'): output_format = 2 elif output_format in (3, 'reconstruct'): output_format = 3 else: raise RuntimeError(f"Invalid value for 'output_format' parameter: '{output_format}'") if output_format == 3 and direction is None: raise RuntimeError("Payload reconstruction (output format 3 or 'reconstruct') requires a direction ('A' or 'B').") if payload_format in (0, 'ascii'): payload = 0 elif payload_format in (1, 'hexdump'): payload = 1 elif payload_format in (2, 'raw', 'binary'): payload = 2 elif payload_format in (4, 'bytearray'): if output_format == 1: raise RuntimeError("'payload_format' 4 or 'bytearray' cannot be specified with 'output_format' 1 or 'info'") payload = 2 colors = False elif payload_format in (3, 'base64'): payload = 3 else: raise RuntimeError(f"Invalid value for 'payload_format' parameter: '{payload_format}'") prog = ( 'follow_stream(' f'{flow},' f'{output_format},' f'{direction if direction else 0},' f'{payload},' f'{0 if reassembly else 1},' f'{0 if colors else 1}' ')' ) result = T2Utils.tawk(prog, filename) if output_format == 0: result = result.split('\n') if result else [] elif output_format == 2: result = result.split('\n') if result else [] result = [json.loads(s) if s else {} for s in result] if payload_format in (4, 'bytearray'): if output_format == 3: result = bytearray.fromhex(result) elif output_format == 0: result = [bytearray.fromhex(s) for s in result] elif output_format == 2: for packet in result: packet['payload'] = bytearray.fromhex(packet['payload']) return result
def generate_config(plugin: str, outfile: str = None, verbose: bool = False)
-
Generate a configuration file for
plugin
.Parameters
plugin
:str
- The name of a plugin.
outfile
:str
, default: None
- The filename where to save the configuration.
If
None
, default to'pluginName.config'
in the plugin home folder. verbose
:bool
, default: False
-
- If
True
, print the output (stdout
andstderr
) of the command. - If
False
, do not print anything.
- If
Raises
NameError
- If the plugin could not be found.
subprocess.CalledProcessError
- If the process exited with a non-zero exit code.
See Also
T2Utils.apply_config()
- Apply a configuration file to the sources.
Notes
Equivalent to:
$ t2conf pluginName -g [file]
.Examples
>>> T2Utils.generate_config('arpDecode') >>> T2Utils.generate_config('arpDecode', '/tmp/arpDecode.config')
Expand source code
@staticmethod def generate_config( plugin: str, outfile: str = None, verbose: bool = False ): """Generate a configuration file for `plugin`. Parameters ---------- plugin : str The name of a plugin. outfile : str, default: None The filename where to save the configuration. If `None`, default to `'pluginName.config'` in the plugin home folder. verbose : bool, default: False - If `True`, print the output (`stdout` and `stderr`) of the command. - If `False`, do not print anything. Raises ------ NameError If the plugin could not be found. subprocess.CalledProcessError If the process exited with a non-zero exit code. See Also -------- T2Utils.apply_config : Apply a configuration file to the sources. Notes ----- Equivalent to: `$ t2conf pluginName -g [file]`. Examples -------- >>> T2Utils.generate_config('arpDecode') >>> T2Utils.generate_config('arpDecode', '/tmp/arpDecode.config') """ if plugin not in T2Utils.valid_plugin_names(): raise NameError(f'Plugin {plugin} could not be found') cmd = [T2Utils.T2CONF, '-y', plugin, '-g'] if outfile: cmd.append(outfile) subprocess.run(cmd, capture_output=not verbose, check=True)
def get_config(plugin: str, name: str, infile: str = None) ‑> Any
-
Get the value of a define from a configuration or header file.
Parameters
plugin
:str
- The name of a plugin.
name
:str
- The name of a configuration flag.
infile
:str
, default: None
- Specify from which file (absolute path) to get the value from, e.g.,
'$T2PLHOME/pluginName/pluginName.cfg'
. Use the special value'default'
to get the value fromdefault.config
. If'source'
orNone
, get the value from the header file.
Raises
NameError
- If the plugin could not be found.
See Also
T2Utils.get_config_from_source()
- Alias for
T2Utils.get_config(plugin, name, 'source')
T2Utils.get_default()
- Alias for
T2Utils.get_config(plugin, name, 'default')
.
Notes
Equivalent to:
$ t2conf pluginName -G name [-g infile]
.Examples
>>> T2Utils.get_config('arpDecode', 'MAX_IP') 10
Expand source code
@staticmethod def get_config( plugin: str, name: str, infile: str = None ) -> Any: """Get the value of a define from a configuration or header file. Parameters ---------- plugin : str The name of a plugin. name : str The name of a configuration flag. infile : str, default: None Specify from which file (absolute path) to get the value from, e.g., `'$T2PLHOME/pluginName/pluginName.cfg'`. Use the special value `'default'` to get the value from `default.config`. If `'source'` or `None`, get the value from the header file. Raises ------ NameError If the plugin could not be found. See Also -------- T2Utils.get_config_from_source : Alias for `T2Utils.get_config(plugin, name, 'source')` T2Utils.get_default : Alias for `T2Utils.get_config(plugin, name, 'default')`. Notes ----- Equivalent to: `$ t2conf pluginName -G name [-g infile]`. Examples -------- >>> T2Utils.get_config('arpDecode', 'MAX_IP') 10 """ if plugin not in T2Utils.valid_plugin_names(): raise NameError(f'Plugin {plugin} could not be found') cmd = [T2Utils.T2CONF, '-y', plugin, '-G', name] if infile and infile != 'source': cmd.extend(['-g', infile]) config = subprocess.run(cmd, text=True, stdout=subprocess.PIPE).stdout val = config.split('=')[1].strip() if config else None if val: if val == 'no': val = 0 elif val == 'yes': val = 1 elif val.lstrip('-').isdigit(): val = int(val) elif val.lstrip('-').replace('.', '', 1).isdigit(): val = float(val) elif val.startswith('"'): val = val.strip('"') elif val.startswith("'"): val = val.strip("'") return val
def get_config_from_source(plugin: str, name: str) ‑> Any
-
Get the value of a define from the header file, e.g.,
pluginName.h
.Parameters
plugin
:str
- The name of a plugin.
name
:str
- The name of a configuration flag.
Raises
NameError
- If the plugin could not be found.
See Also
T2Utils.get_config()
- Get the value of a define from a configuration or header file.
T2Utils.get_default()
- Get the default value of a define.
Notes
Equivalent to:
$ t2conf pluginName -G name
.Alias for
T2Utils.get_config(plugin, name, infile='source')
.Examples
>>> T2Utils.get_config_from_source('arpDecode', 'MAX_IP') 10
Expand source code
@staticmethod def get_config_from_source( plugin: str, name: str ) -> Any: """Get the value of a define from the header file, e.g., `pluginName.h`. Parameters ---------- plugin : str The name of a plugin. name : str The name of a configuration flag. Raises ------ NameError If the plugin could not be found. See Also -------- T2Utils.get_config : Get the value of a define from a configuration or header file. T2Utils.get_default : Get the default value of a define. Notes ----- Equivalent to: `$ t2conf pluginName -G name`. Alias for `T2Utils.get_config(plugin, name, infile='source')`. Examples -------- >>> T2Utils.get_config_from_source('arpDecode', 'MAX_IP') 10 """ return T2Utils.get_config(plugin, name, 'source')
def get_default(plugin: str, name: str) ‑> Any
-
Get the default value of a define.
Parameters
plugin
:str
- The name of a plugin.
name
:str
- The name of a configuration flag.
Raises
NameError
- If the plugin could not be found.
See Also
T2Utils.get_config()
- Get the value of a define from a configuration or header file.
T2Utils.get_config_from_source()
- Get the value of a define from a header file.
Notes
Equivalent to:
$ t2conf pluginName -G name -g default
.Alias for
T2Utils.get_config(plugin, name, infile='default')
.Examples
>>> T2Utils.get_default('arpDecode', 'MAX_IP') 10
Expand source code
@staticmethod def get_default( plugin: str, name: str ) -> Any: """Get the default value of a define. Parameters ---------- plugin : str The name of a plugin. name : str The name of a configuration flag. Raises ------ NameError If the plugin could not be found. See Also -------- T2Utils.get_config : Get the value of a define from a configuration or header file. T2Utils.get_config_from_source : Get the value of a define from a header file. Notes ----- Equivalent to: `$ t2conf pluginName -G name -g default`. Alias for `T2Utils.get_config(plugin, name, infile='default')`. Examples -------- >>> T2Utils.get_default('arpDecode', 'MAX_IP') 10 """ return T2Utils.get_config(plugin, name, 'default')
def list_config(plugin: str) ‑> List[str]
-
List the configuration flags available for
plugin
.Parameters
plugin
:str
- The name of a plugin
Returns
list
ofstr
- The list of configuration flags available for the plugin.
Raises
NameError
- If the plugin could not be found.
Notes
Equivalent to:
$ t2conf pluginName -I
.Examples
>>> T2Utils.list_config('arpDecode') ['MAX_IP']
Expand source code
@staticmethod def list_config( plugin: str ) -> List[str]: """List the configuration flags available for `plugin`. Parameters ---------- plugin : str The name of a plugin Returns ------- list of str The list of configuration flags available for the plugin. Raises ------ NameError If the plugin could not be found. Notes ----- Equivalent to: `$ t2conf pluginName -I`. Examples -------- >>> T2Utils.list_config('arpDecode') ['MAX_IP'] """ if plugin not in T2Utils.valid_plugin_names(): raise NameError(f'Plugin {plugin} could not be found') cmd = [T2Utils.T2CONF, '-y', plugin, '-I'] config = subprocess.run(cmd, text=True, stdout=subprocess.PIPE).stdout.strip() all_flags = config.split('\n') if config else list() flags = [f.strip() for f in all_flags] if len(flags) == 1 and flags[0] == f'No configuration flags found for {plugin}': flags = list() return flags
def list_plugins(infile: str = None) ‑> List[str]
-
List the active plugins in
infile
.Parameters
infile
:str
, default: None
- Name of a plugin loading list.
If
None
, default to'plugins.load'
in the plugin folder.
Returns
list
ofstr
List of active plugins in
infile
.Notes
Equivalent to:
$ t2conf -S [file]
.Examples
>>> T2Utils.list_plugins() ['arpDecode', 'txtSink'] >>> T2Utils.list_plugins('/tmp/myPlugins.load') ['arpDecode', 'txtSink']
Expand source code
@staticmethod def list_plugins( infile: str = None ) -> List[str]: """List the active plugins in `infile`. Parameters ---------- infile : str, default: None Name of a plugin loading list. If `None`, default to `'plugins.load'` in the plugin folder. Returns ------- list of str List of active plugins in `infile`. Notes ----- Equivalent to: `$ t2conf -S [file]`. Examples -------- >>> T2Utils.list_plugins() ['arpDecode', 'txtSink'] >>> T2Utils.list_plugins('/tmp/myPlugins.load') ['arpDecode', 'txtSink'] """ cmd = [T2Utils.T2CONF, '-y', '-S'] if infile: cmd.append(infile) out = subprocess.run(cmd, text=True, stdout=subprocess.PIPE).stdout return sorted(out.splitlines())
def load_plugins(plugin: Union[str, List[str]] = None)
-
Load plugins as
T2Plugin
objects.The plugins will be available as, e.g.,
T2Utils.pluginName
.Parameters
plugin
:str
orlist
ofstr
, default: None
-
Name of the plugin(s) to load.
plugin
can be:- a plugin name
- a list of plugin names
None
or 'all' (slow)
Raises
NameError
- If a plugin could not be found.
Examples
>>> T2Utils.load_plugins('arpDecode') >>> T2Utils.load_plugins(['arpDecode', 'txtSink']) >>> T2Utils.load_plugins() >>> T2Utils.arpDecode <t2py.T2Plugin.T2Plugin object at 0x16055a370>
Expand source code
@staticmethod def load_plugins( plugin: Union[str, List[str]] = None ): """Load plugins as `T2Plugin` objects. The plugins will be available as, e.g., ``T2Utils.pluginName``. Parameters ---------- plugin : str or list of str, default: None Name of the plugin(s) to load. `plugin` can be: - a plugin name - a list of plugin names - `None` or 'all' (slow) Raises ------ NameError If a plugin could not be found. Examples -------- >>> T2Utils.load_plugins('arpDecode') >>> T2Utils.load_plugins(['arpDecode', 'txtSink']) >>> T2Utils.load_plugins() >>> T2Utils.arpDecode <t2py.T2Plugin.T2Plugin object at 0x16055a370> """ from .T2Plugin import T2Plugin all_plugins = T2Utils.valid_plugin_names() if isinstance(plugin, list): plugins = plugin elif not plugin or plugin == 'all': plugins = all_plugins else: plugins = [plugin] for p in plugins: if p not in all_plugins: raise NameError(f'Plugin {p} could not be found') else: setattr(T2Utils, p, T2Plugin(p))
def network_interfaces() ‑> List[str]
-
List the names of all available network interfaces.
Returns
list
ofstr
- The list of available network interfaces.
Examples
>>> T2Utils.network_interfaces() ['lo0', 'eth0']
Expand source code
@staticmethod def network_interfaces() -> List[str]: """List the names of all available network interfaces. Returns ------- list of str The list of available network interfaces. Examples -------- >>> T2Utils.network_interfaces() ['lo0', 'eth0'] """ import socket return [iface[1] for iface in socket.if_nameindex()]
def plugin_description(plugin: str) ‑> str
-
Return a short description of a plugin.
Parameters
plugin
:str
- The name of a plugin.
Returns
str
- The description of the plugin.
Raises
NameError
- If the plugin could not be found.
Notes
Equivalent to:
$ t2plugin -l | awk '$1 == "plugin" { sub("^" $1 FS "+" $2 FS "+", ""); print }'
.Examples
>>> T2Utils.plugin_description('arpDecode') 'Address Resolution Protocol (ARP)'
Expand source code
@staticmethod def plugin_description( plugin: str ) -> str: """Return a short description of a plugin. Parameters ---------- plugin : str The name of a plugin. Returns ------- str The description of the plugin. Raises ------ NameError If the plugin could not be found. Notes ----- Equivalent to: `$ t2plugin -l | awk '$1 == "plugin" { sub("^" $1 FS "+" $2 FS "+", ""); print }'`. Examples -------- >>> T2Utils.plugin_description('arpDecode') 'Address Resolution Protocol (ARP)' """ if plugin not in T2Utils.valid_plugin_names(): raise NameError(f'Plugin {plugin} could not be found') cmd = [T2Utils.T2PLUGIN, '-l'] out = subprocess.run(cmd, text=True, stdout=subprocess.PIPE).stdout for line in out.splitlines(): cols = line.split() if len(cols) > 2 and cols[0] == plugin: return ' '.join(cols[2:])
def plugin_number(plugin: str) ‑> str
-
Return the plugin number of a plugin.
Parameters
plugin
:str
- The name of a plugin.
Returns
str
- The plugin number.
Raises
NameError
- If the plugin could not be found.
Notes
Equivalent to:
$ t2plugin -l | awk '$1 == "pluginName" { print $2; exit }'
Examples
>>> T2Utils.plugin_number('arpDecode') '179'
Expand source code
@staticmethod def plugin_number( plugin: str ) -> str: """Return the plugin number of a plugin. Parameters ---------- plugin : str The name of a plugin. Returns ------- str The plugin number. Raises ------ NameError If the plugin could not be found. Notes ----- Equivalent to: `$ t2plugin -l | awk '$1 == "pluginName" { print $2; exit }'` Examples -------- >>> T2Utils.plugin_number('arpDecode') '179' """ if plugin not in T2Utils.valid_plugin_names(): raise NameError(f'Plugin {plugin} could not be found') cmd = [T2Utils.T2PLUGIN, '-l'] out = subprocess.run(cmd, text=True, stdout=subprocess.PIPE).stdout for line in out.splitlines(): cols = line.split() if len(cols) > 2 and cols[0] == plugin: return cols[1]
def plugins(category: Union[str, List[str]] = None) ‑> List[str]
-
Alphabetically List all the available plugins.
Parameters
category
:str
orlist
ofstr
, default: None
-
category
can be used to specify the types of plugins to list:'g'
: global'b'
: basic'l2'
: layer 2'l4'
: layer 3/4'l7'
: layer 7'a'
: application'm'
: math'c'
: classifier'o'
: outputNone
: all
Returns
list
ofstr
- List of all available plugins.
Examples
>>> T2Utils.plugins() >>> T2Utils.plugins('g') ['protoStats'] >>> T2Utils.plugins(['g', 'l2']) ['arpDecode', 'cdpDecode', 'lldpDecode', 'protoStats', 'stpDecode', 'vtpDecode']
Expand source code
@staticmethod def plugins( category: Union[str, List[str]] = None ) -> List[str]: """Alphabetically List all the available plugins. Parameters ---------- category : str or list of str, default: None `category` can be used to specify the types of plugins to list: - `'g'` : global - `'b'` : basic - `'l2'`: layer 2 - `'l4'`: layer 3/4 - `'l7'`: layer 7 - `'a'` : application - `'m'` : math - `'c'` : classifier - `'o'` : output - `None`: all Returns ------- list of str List of all available plugins. Examples -------- >>> T2Utils.plugins() >>> T2Utils.plugins('g') ['protoStats'] >>> T2Utils.plugins(['g', 'l2']) ['arpDecode', 'cdpDecode', 'lldpDecode', 'protoStats', 'stpDecode', 'vtpDecode'] """ cmd = [T2Utils.T2PLUGIN, '-H', '-N'] if isinstance(category, list): for c in category: cmd.append(f'-l={c}') elif category: cmd.append(f'-l={category}') elif T2Utils._all_plugins: return T2Utils._all_plugins else: cmd.append('-l') out = subprocess.run(cmd, text=True, stdout=subprocess.PIPE).stdout sorted_list = sorted(out.splitlines()) if not category: T2Utils._all_plugins = sorted_list return sorted_list
def reset_config(plugin: Union[str, List[str]], name: Union[str, List[str]] = None, outfile: str = None, verbose: bool = False)
-
Reset all configuration flags from a plugin to their default values.
Parameters
plugin
:str
orlist
ofstr
-
plugin
can be:- a single plugin, e.g.,
'pluginName'
- a list of plugins, e.g.,
['pluginName1', 'pluginName2']
- a single plugin, e.g.,
name
:str
, default: None
- The name of a configuration flag to reset.
If
name
is set, only reset the value for the specified flag(s). outfile
:str
, default: None
- Path to a configuration file where the changes will be rest.
Use
outfile='source'
to reset the value in the header file. verbose
:bool
, default: False
-
- If
True
, print the output (stdout
andstderr
) of the command. - If
False
, do not print anything.
- If
Raises
NameError
- If a plugin could not be found.
NotImplementedError
- If trying to reset specific flag(s) with multiple plugins.
subprocess.CalledProcessError
- If the process exited with a non-zero exit code.
See Also
T2Utils.set_default()
- Set the value of a define in a configuration or header file.
Notes
Equivalent to:
$ t2conf pluginName --reset [-g outfile]
.Alias for
T2Utils.set_default()(plugin, None, outfile)
.Examples
>>> T2Utils.reset_config('arpDecode') >>> T2Utils.reset_config('arpDecode', 'MAX_IP') >>> T2Utils.reset_config(['arpDecode', 'txtSink']) >>> T2Utils.reset_config('arpDecode', 'MAX_IP', '/tmp/arpDecode.config')
Expand source code
@staticmethod def reset_config( plugin: Union[str, List[str]], name: Union[str, List[str]] = None, outfile: str = None, verbose: bool = False ): """Reset all configuration flags from a plugin to their default values. Parameters ---------- plugin : str or list of str `plugin` can be: - a single plugin, e.g., `'pluginName'` - a list of plugins, e.g., `['pluginName1', 'pluginName2']` name : str, default: None The name of a configuration flag to reset. If `name` is set, only reset the value for the specified flag(s). outfile : str, default: None Path to a configuration file where the changes will be rest. Use `outfile='source'` to reset the value in the header file. verbose : bool, default: False - If `True`, print the output (`stdout` and `stderr`) of the command. - If `False`, do not print anything. Raises ------ NameError If a plugin could not be found. NotImplementedError If trying to reset specific flag(s) with multiple plugins. subprocess.CalledProcessError If the process exited with a non-zero exit code. See Also -------- T2Utils.set_default : Set the value of a define in a configuration or header file. Notes ----- Equivalent to: `$ t2conf pluginName --reset [-g outfile]`. Alias for `T2Utils.set_default(plugin, None, outfile)`. Examples -------- >>> T2Utils.reset_config('arpDecode') >>> T2Utils.reset_config('arpDecode', 'MAX_IP') >>> T2Utils.reset_config(['arpDecode', 'txtSink']) >>> T2Utils.reset_config('arpDecode', 'MAX_IP', '/tmp/arpDecode.config') """ T2Utils.set_default(plugin, name, outfile=outfile, verbose=verbose)
def run_tranalyzer(pcap: str = None, iface: str = None, pcap_list: Union[str, List[str]] = None, output_prefix: str = None, log_file: bool = False, monitoring_file: bool = False, packet_mode: bool = False, plugin_folder: str = None, loading_list: str = None, plugins: List[str] = None, bpf: str = None, t2_exec: str = None, timeout: int = None, verbose: bool = False)
-
Run Tranalyzer2.
Parameters
pcap
:str
, default: None
- Path to a pcap file.
iface
:str
, default: None
- Name of a network interface.
pcap_list
:str
orlist
ofstr
, default: None
-
- Path to a list of pcap files, e.g.,
'/tmp/myPcaps.txt'
. - Or list of path to pcap files, e.g.,
['file1.pcap', 'file2.pcap']
.
- Path to a list of pcap files, e.g.,
output_prefix
:str
, default: None
- If
None
, automatically derived from input. log_file
:bool
, default: False
- Save the final report in a
_log.txt
file. monitoring_file
:bool
, default: False
- Save the monitoring report in a
_monitoring.txt
file. packet_mode
:bool
, default: False
- Activate Tranalyzer2 packet mode.
plugin_folder
:str
, default: None
- Path to the plugin folder.
loading_list
:str
, default: None
- Path to a plugin loading list.
If set, the
plugins
parameter is ignored. plugins
:list
ofstr
, default: None
- The list of plugins to use.
If
None
, load the plugins available in the plugin folder. bpf
:str
, default: None
- A BPF filter.
t2_exec
:str
, default: None
- Path to the Tranalyzer2 executable.
If
None
, useT2Utils.t2_exec()
timeout
:int
, default: None
- Number of seconds after which to terminate the process.
If
None
, run forever or until the end of file is reached. verbose
:bool
, default: False
-
- If
True
, print the output (stdout
andstderr
) of the command. - If
False
, do not print anything.
- If
Raises
OSError
- If specified interface
iface
does not exist locally. RuntimeError
- If none or more than one input (
pcap
,pcap_list
,iface
) is specified. subprocess.CalledProcessError
- If the process exited with a non-zero exit code.
subprocess.TimeoutExpired
- If
timeout
was expired and its value reached.
Examples
>>> T2Utils.run_tranalyzer(pcap='/tmp/file.pcap', output_prefix='/tmp/')
Expand source code
@staticmethod def run_tranalyzer( pcap: str = None, iface: str = None, pcap_list: Union[str, List[str]] = None, output_prefix: str = None, log_file: bool = False, monitoring_file: bool = False, packet_mode: bool = False, plugin_folder: str = None, loading_list: str = None, plugins: List[str] = None, bpf: str = None, t2_exec: str = None, timeout: int = None, verbose: bool = False ): """Run Tranalyzer2. Parameters ---------- pcap : str, default: None Path to a pcap file. iface : str, default: None Name of a network interface. pcap_list : str or list of str, default: None - Path to a list of pcap files, e.g., `'/tmp/myPcaps.txt'`. - Or list of path to pcap files, e.g., `['file1.pcap', 'file2.pcap']`. output_prefix : str, default: None If `None`, automatically derived from input. log_file : bool, default: False Save the final report in a `_log.txt` file. monitoring_file : bool, default: False Save the monitoring report in a `_monitoring.txt` file. packet_mode : bool, default: False Activate Tranalyzer2 packet mode. plugin_folder : str, default: None Path to the plugin folder. loading_list : str, default: None Path to a plugin loading list. If set, the `plugins` parameter is ignored. plugins : list of str, default: None The list of plugins to use. If `None`, load the plugins available in the plugin folder. bpf : str, default: None A BPF filter. t2_exec : str, default: None Path to the Tranalyzer2 executable. If `None`, use `T2Utils.t2_exec()` timeout : int, default: None Number of seconds after which to terminate the process. If `None`, run forever or until the end of file is reached. verbose : bool, default: False - If `True`, print the output (`stdout` and `stderr`) of the command. - If `False`, do not print anything. Raises ------ OSError If specified interface `iface` does not exist locally. RuntimeError If none or more than one input (`pcap`, `pcap_list`, `iface`) is specified. subprocess.CalledProcessError If the process exited with a non-zero exit code. subprocess.TimeoutExpired If `timeout` was expired and its value reached. Examples -------- >>> T2Utils.run_tranalyzer(pcap='/tmp/file.pcap', output_prefix='/tmp/') """ if (pcap and pcap_list) or (pcap and iface) or (iface and pcap_list): raise RuntimeError('Only one input (pcap, pcap_list, iface) may be specified') if not pcap and not iface and not pcap_list: raise RuntimeError('An input (pcap, pcap_list or iface) is required') if not t2_exec: t2_exec = T2Utils.t2_exec() if not t2_exec or not isfile(t2_exec): raise RuntimeError(f'Tranalyzer2 executable could not be found at {t2_exec}') cmd = [t2_exec] # Input arguments: -r pcap or -R pcap_list or -i iface if pcap: cmd.extend(['-r', pcap]) elif pcap_list: if isinstance(pcap_list, list): T2Utils.create_pcap_list(pcap_list, '/tmp/pcap_list.txt') pcap_list = '/tmp/pcap_list.txt' cmd.extend(['-R', pcap_list]) elif iface: if iface not in T2Utils.network_interfaces(): raise OSError(f'Interface {iface} does not exist locally') cmd.extend(['-i', iface]) # Output arguments: -w prefix / -l / -m / -s if output_prefix: cmd.extend(['-w', output_prefix]) if log_file: cmd.append('-l') if monitoring_file: cmd.append('-m') if packet_mode: cmd.append('-s') # Optional arguments if not loading_list and plugins: loading_list = '/tmp/plugins.load' T2Utils.create_plugin_list(plugins, outfile=loading_list, verbose=verbose) if loading_list: cmd.extend(['-b', loading_list]) if plugin_folder: cmd.extend(['-p', plugin_folder]) if bpf: cmd.append(bpf) subprocess.run(cmd, capture_output=not verbose, check=True, timeout=timeout)
def set_default(plugin: Union[str, List[str]], name: Union[str, List[str]] = None, outfile: str = None, verbose: bool = False)
-
Reset the value of a define to its default.
Parameters
plugin
:str
orlist
ofstr
-
plugin
can be:- a single plugin, e.g.,
'pluginName'
- a list of plugins, e.g.,
['pluginName1', 'pluginName2']
all
to apply the operation to all available plugins
- a single plugin, e.g.,
name
:str
orlist
ofstr
, default: None
- The name of a configuration flag to reset.
If
name
is set, only reset the value for the specified flag(s). outfile
:str
, default: None
- Path to a configuration file where the changes will be rest.
Use
outfile='source'
to set the value in the header file. verbose
:bool
, default: False
-
- If
True
, print the output (stdout
andstderr
) of the command. - If
False
, do not print anything.
- If
Raises
NameError
- If a plugin could not be found.
NotImplementedError
- If trying to reset specific flag(s) with multiple plugins.
subprocess.CalledProcessError
- If the process exited with a non-zero exit code.
See Also
T2Utils.set_config
- Set the value of a define in a configuration or header file.
T2Utils.reset_config()
- Reset all configuration flags from a plugin to their default values.
Notes
Equivalent to:
$ t2conf pluginName -D name=default [-g outfile]
.Alias for
T2Utils.set_config(plugin, name, 'default', outfile)
.Examples
>>> T2Utils.set_default('arpDecode', 'MAX_IP') >>> T2Utils.set_default('basicStats', ['BS_VAR', 'BS_STDDEV']) >>> T2Utils.set_default(['arpDecode', 'basicStats'])
Expand source code
@staticmethod def set_default( plugin: Union[str, List[str]], name: Union[str, List[str]] = None, outfile: str = None, verbose: bool = False ): """Reset the value of a define to its default. Parameters ---------- plugin : str or list of str `plugin` can be: - a single plugin, e.g., `'pluginName'` - a list of plugins, e.g., `['pluginName1', 'pluginName2']` - `all` to apply the operation to all available plugins name : str or list of str, default: None The name of a configuration flag to reset. If `name` is set, only reset the value for the specified flag(s). outfile : str, default: None Path to a configuration file where the changes will be rest. Use `outfile='source'` to set the value in the header file. verbose : bool, default: False - If `True`, print the output (`stdout` and `stderr`) of the command. - If `False`, do not print anything. Raises ------ NameError If a plugin could not be found. NotImplementedError If trying to reset specific flag(s) with multiple plugins. subprocess.CalledProcessError If the process exited with a non-zero exit code. See Also -------- T2Utils.set_config : Set the value of a define in a configuration or header file. T2Utils.reset_config : Reset all configuration flags from a plugin to their default values. Notes ----- Equivalent to: `$ t2conf pluginName -D name=default [-g outfile]`. Alias for `T2Utils.set_config(plugin, name, 'default', outfile)`. Examples -------- >>> T2Utils.set_default('arpDecode', 'MAX_IP') >>> T2Utils.set_default('basicStats', ['BS_VAR', 'BS_STDDEV']) >>> T2Utils.set_default(['arpDecode', 'basicStats']) """ if name: if isinstance(plugin, list): raise NotImplementedError('Cannot reset specific flag(s) with list of plugins') elif plugin == 'all': raise NotImplementedError("Cannot reset specific flag(s) for 'all' plugins") if isinstance(name, list): for n in name: T2Utils.set_config(plugin, n, 'default', outfile=outfile, verbose=verbose) else: T2Utils.set_config(plugin, name, 'default', outfile=outfile, verbose=verbose) else: cmd = [T2Utils.T2CONF, '-y', '--reset'] accept_outfile = False if plugin == 'all': cmd.append('-a') elif isinstance(plugin, list): cmd.extend(plugin) else: accept_outfile = True cmd.append(plugin) if outfile and outfile != 'source': if not accept_outfile: raise NotImplementedError("Cannot reset configuration in specific file with list of plugins or 'all' plugins") cmd.extend(['-g', outfile]) subprocess.run(cmd, capture_output=not verbose, check=True)
def t2_exec(debug: bool = False) ‑> str
-
Return the path to Tranalyzer2 release or debug executable.
Parameters
debug
:bool
, default: False
-
- If
True
, returns the path to the debug executable.- If
False
, returns the path to the release executable.
- If
- If
Returns
str
- The path to Tranalyzer2 executable.
Examples
>>> T2Utils.t2_exec() /home/user/tranalyzer2-0.9.0/tranalyzer2/build/tranalyzer >>> T2Utils.t2_exec(debug=True) /home/user/tranalyzer2-0.9.0/tranalyzer2/debug/tranalyzer
Expand source code
@staticmethod def t2_exec( debug: bool = False ) -> str: """Return the path to Tranalyzer2 release or debug executable. Parameters ---------- debug : bool, default: False - If `True`, returns the path to the debug executable. - If `False`, returns the path to the release executable. Returns ------- str The path to Tranalyzer2 executable. Examples -------- >>> T2Utils.t2_exec() /home/user/tranalyzer2-0.9.0/tranalyzer2/build/tranalyzer >>> T2Utils.t2_exec(debug=True) /home/user/tranalyzer2-0.9.0/tranalyzer2/debug/tranalyzer """ folder: str = 'debug' if debug else 'build' return join(T2Utils.T2HOME, 'tranalyzer2', folder, 'tranalyzer')
def tawk(program: str = None, filename: str = None, options: List[str] = None) ‑> str
-
Call
tawk
with theprogram
,options
andfilename
specified.Parameters
program
:str
, default: None
- A
tawk
program, e.g.,"$dstPort == 80 { print tuple5() }"
. filename
:str
, default: None
- Name of the file to analyze.
options
:list
ofstr
, default: None
- List of options to pass to
tawk
.
Returns
str
- The result of the
tawk
command.
Raises
RuntimeError
- If
filename
was specified, but the file does not exist. subprocess.CalledProcessError
- If the process exited with a non-zero exit code.
Notes
Equivalent to:
$ tawk [options] 'program' filename
.Examples
>>> result = T2Utils.tawk(None, None, ['-V', 'flowStat=0x1234']) >>> print(result) >>> T2Utils.tawk('host("1.2.3.4") { aggr(proto()) }', 'file.txt')
Expand source code
@staticmethod def tawk( program: str = None, filename: str = None, options: List[str] = None ) -> str: """Call `tawk` with the `program`, `options` and `filename` specified. Parameters ---------- program : str, default: None A `tawk` program, e.g., `"$dstPort == 80 { print tuple5() }"`. filename : str, default: None Name of the file to analyze. options : list of str, default: None List of options to pass to `tawk`. Returns ------- str The result of the `tawk` command. Raises ------ RuntimeError If `filename` was specified, but the file does not exist. subprocess.CalledProcessError If the process exited with a non-zero exit code. Notes ----- Equivalent to: `$ tawk [options] 'program' filename`. Examples -------- >>> result = T2Utils.tawk(None, None, ['-V', 'flowStat=0x1234']) >>> print(result) >>> T2Utils.tawk('host("1.2.3.4") { aggr(proto()) }', 'file.txt') """ if filename and not isfile(filename): raise RuntimeError(f"Input file '{filename}' does not exist") cmd = [T2Utils.TAWK] if program: cmd.append(program) if options: cmd.extend(options) if filename: cmd.append(filename) result = subprocess.run(cmd, capture_output=True, check=True) result = result.stdout.decode('utf8').strip() return result
def to_json_array(infile: str, delimiter: str = '\t') ‑> List[Dict[str, Any]]
-
Convert a flow or packet file to an array of JSON objects.
Parameters
infile
:str
- Path to a flow or packet file.
delimiter
:str
, default: '\t'
- Field delimiter used in the input file.
Returns
list
ofdict
- The list of rows contained in
infile
. Each row is represented as a dict with the column names as key.
Raises
json.decoder.JSONDecodeError
- If
infile
was a JSON file and the decoding process failed.
Examples
>>> flows = T2Utils.to_json_array('/tmp/file_flows.txt') >>> flows [{'dir': 'A', 'flowInd': 1, ...}, {'dir': 'B', 'flowInd': 1, ...}, ...]
Expand source code
@staticmethod def to_json_array( infile: str, delimiter: str = '\t' ) -> List[Dict[str, Any]]: """Convert a flow or packet file to an array of JSON objects. Parameters ---------- infile : str Path to a flow or packet file. delimiter : str, default: '\\t' Field delimiter used in the input file. Returns ------- list of dict The list of rows contained in `infile`. Each row is represented as a dict with the column names as key. Raises ------ json.decoder.JSONDecodeError If `infile` was a JSON file and the decoding process failed. Examples -------- >>> flows = T2Utils.to_json_array('/tmp/file_flows.txt') >>> flows [{'dir': 'A', 'flowInd': 1, ...}, {'dir': 'B', 'flowInd': 1, ...}, ...] """ data = [] if not infile.endswith('.json'): data = T2Utils._tsv_to_list(infile, delimiter) else: with open(infile) as f: for flow in f: if flow in ('[\n', ']\n'): continue data.append(json.loads(flow.strip(','))) return data
def to_pandas(infile: str, delimiter: str = None) ‑> pandas.core.frame.DataFrame
-
Convert a flow or packet file to pandas
DataFrame
.Parameters
infile
:str
- Path to a flow or packet file.
delimiter
:str
, default: None
- Field delimiter used in the input file.
Returns
pd.DataFrame
- DataFrame holding the tabular data stored in
infile
.
Examples
>>> df = T2Utils.to_pandas('/tmp/file_flows.txt') >>> type(df) <class 'pandas.core.frame.DataFrame'>
Expand source code
@staticmethod def to_pandas( infile: str, delimiter: str = None ) -> "pandas.core.frame.DataFrame": """Convert a flow or packet file to pandas `DataFrame`. Parameters ---------- infile : str Path to a flow or packet file. delimiter : str, default: None Field delimiter used in the input file. Returns ------- pd.DataFrame DataFrame holding the tabular data stored in `infile`. Examples -------- >>> df = T2Utils.to_pandas('/tmp/file_flows.txt') >>> type(df) <class 'pandas.core.frame.DataFrame'> """ import pandas as pd if infile.endswith('.json'): return pd.DataFrame(T2Utils.to_json_array(infile)) else: return pd.read_table(infile, delimiter=delimiter)
def to_pdf(pcap: str = None, flow_file: str = None, prefix: str = None, config: bool = True, reset_config: bool = True, open_pdf: bool = True, verbose: bool = False)
-
Generate a PDF/LaTeX report from a PCAP or Tranalyzer flow file."
Parameters
pcap
:str
, default: None
- PCAP file to analyze.
file
:str
, default: None
- Flow file to analyze.
prefix
:str
, default: None
- Append 'prefix' to any output file produced.
config
:bool
, default: True
- If
True
, configure and build Tranalyzer2 and the plugins as required. reset_config
:bool
, default: True
- If
True
, reset tranalyzer2 and the plugins configuration at the end. open_pdf
:bool
, default: True
- If
True
, automatically open the generated PDF. verbose
:bool
, default: False
-
- If
True
, print the output (stdout
andstderr
) of the command. - If
False
, do not print anything.
- If
Raises
RuntimeError
-
- If one of the required parameter (pcap or flow_file) was not specified.
- If multiple inputs (pcap and flow_file) were specified.
subprocess.CalledProcessError
- If the process exited with a non-zero exit code.
Notes
Equivalent to:
$ t2fm [-b] [-A] [-r pcap|-F file]
.Examples
>>> T2Utils.to_pdf(pcap='file.pcap') >>> T2Utils.to_pdf(flow_file='file_flows.txt')
Expand source code
@staticmethod def to_pdf( pcap: str = None, flow_file: str = None, prefix: str = None, config: bool = True, reset_config: bool = True, open_pdf: bool = True, verbose: bool = False ): """Generate a PDF/LaTeX report from a PCAP or Tranalyzer flow file." Parameters ---------- pcap : str, default: None PCAP file to analyze. file : str, default: None Flow file to analyze. prefix : str, default: None Append 'prefix' to any output file produced. config : bool, default: True If `True`, configure and build Tranalyzer2 and the plugins as required. reset_config : bool, default: True If `True`, reset tranalyzer2 and the plugins configuration at the end. open_pdf : bool, default: True If `True`, automatically open the generated PDF. verbose : bool, default: False - If `True`, print the output (`stdout` and `stderr`) of the command. - If `False`, do not print anything. Raises ------ RuntimeError - If one of the required parameter (pcap or flow_file) was not specified. - If multiple inputs (pcap and flow_file) were specified. subprocess.CalledProcessError If the process exited with a non-zero exit code. Notes ----- Equivalent to: `$ t2fm [-b] [-A] [-r pcap|-F file]`. Examples -------- >>> T2Utils.to_pdf(pcap='file.pcap') >>> T2Utils.to_pdf(flow_file='file_flows.txt') """ if not pcap and not flow_file: raise RuntimeError('One of (pcap, flow_file) must be specified.') elif pcap and flow_file: raise RuntimeError('Only one of (pcap, flow_file) must be specified.') cmd = [T2Utils.T2FM, '-y'] # Input if pcap: cmd.extend(['-r', pcap]) elif flow_file: cmd.extend(['-F', flow_file]) # Output if prefix: cmd.extend(['-w', prefix]) # Options if config: cmd.append('-b') if reset_config: cmd.append('--reset') if open_pdf: cmd.append('-A') subprocess.run(cmd, capture_output=not verbose, check=True)
def unload(plugin: Union[str, List[str]], plugin_folder: str = None, verbose: bool = False)
-
Remove (unload) a plugin from the plugin folder.
Parameters
plugin
:str
orlist
ofstr
-
plugin
can be:- a single plugin, e.g.,
'pluginName'
- a list of plugins, e.g.,
['pluginName1', 'pluginName2']
- a plugin loading list, e.g.,
'myPlugins.txt'
all
to apply the operation to all available plugins
- a single plugin, e.g.,
plugin_folder
:str
, default: None
- The plugin folder from where the plugin must be unloaded.
If
None
, use the default plugin folder ('$HOME/.tranalyzer/plugins'
). verbose
:bool
, default: False
-
- If
True
, print the output (stdout
andstderr
) of the command. - If
False
, do not print anything.
- If
Raises
subprocess.CalledProcessError
- If the process exited with a non-zero exit code.
Notes
Equivalent to: -
$ t2build -u pluginName
-$ t2build -u pluginName1 pluginName2
-$ t2build -u -b myPlugins.txt
-$ t2build -u -a
Examples
>>> T2Utils.unload('arpDecode') >>> T2Utils.unload(['arpDecode', 'txtSink']) >>> T2Utils.unload('/tmp/myPlugins.load') >>> T2Utils.unload('all')
Expand source code
@staticmethod def unload( plugin: Union[str, List[str]], plugin_folder: str = None, verbose: bool = False ): """Remove (unload) a plugin from the plugin folder. Parameters ---------- plugin : str or list of str `plugin` can be: - a single plugin, e.g., `'pluginName'` - a list of plugins, e.g., `['pluginName1', 'pluginName2']` - a plugin loading list, e.g., `'myPlugins.txt'` - `all` to apply the operation to all available plugins plugin_folder : str, default: None The plugin folder from where the plugin must be unloaded. If `None`, use the default plugin folder (`'$HOME/.tranalyzer/plugins'`). verbose : bool, default: False - If `True`, print the output (`stdout` and `stderr`) of the command. - If `False`, do not print anything. Raises ------ subprocess.CalledProcessError If the process exited with a non-zero exit code. Notes ----- Equivalent to: - `$ t2build -u pluginName` - `$ t2build -u pluginName1 pluginName2` - `$ t2build -u -b myPlugins.txt` - `$ t2build -u -a` Examples -------- >>> T2Utils.unload('arpDecode') >>> T2Utils.unload(['arpDecode', 'txtSink']) >>> T2Utils.unload('/tmp/myPlugins.load') >>> T2Utils.unload('all') """ cmd = [T2Utils.T2BUILD, '-y', '-u'] if plugin_folder: cmd.extend(['-p', plugin_folder]) if plugin == 'all': cmd.append('-a') elif isinstance(plugin, list): cmd.extend(plugin) elif isfile(plugin): cmd.extend(['-b', plugin]) else: cmd.append(plugin) subprocess.run(cmd, capture_output=not verbose, check=True)
def valid_plugin_names() ‑> List[str]
-
Return a list of valid plugin names.
Returns
list
ofstr
- The list of valid plugin names.
Examples
>>> T2Utils.valid_plugin_names() ['arpDecode', ..., 'tranalyzer2', ...]
Expand source code
@staticmethod def valid_plugin_names() -> List[str]: """Return a list of valid plugin names. Returns ------- list of str The list of valid plugin names. Examples -------- >>> T2Utils.valid_plugin_names() ['arpDecode', ..., 'tranalyzer2', ...] """ valid_plugins = ['tranalyzer2'] valid_plugins.extend(T2Utils.plugins()) return sorted(valid_plugins)