Visual GPU Log Analytics Part II: GPU dataframes with RAPIDS Python cudf bindings#
Graphistry is great – Graphistry and RAPIDS/BlazingDB is better!
This tutorial series visually analyzes Zeek/Bro network connection logs using different compute engines:
Part I: CPU Baseline in Python Pandas
Part III: GPU SQL - deprecated as Dask-SQL replaced BlazingSQL in the RAPIDS ecosystem
Part II Contents:
Time using GPU-based RAPIDS Python cudf bindings and Graphistry for a full ETL & visual analysis flow:
Load data
Analyze data
Visualize data
TIP: If you get out of memory errors, you usually must restart the kernel & refresh the page
[1]:
#!pip install graphistry -q
import pandas as pd
import numpy as np
import cudf
import graphistry
graphistry.__version__
# To specify Graphistry account & server, use:
# graphistry.register(api=3, username='...', password='...', protocol='https', server='hub.graphistry.com')
# For more options, see https://github.com/graphistry/pygraphistry#configure
[2]:
# check nvidia config
!nvidia-smi
Tue Nov 22 06:20:14 2022
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 470.141.03 Driver Version: 470.141.03 CUDA Version: 11.4 |
|-------------------------------+----------------------+----------------------+
| GPU Name Persistence-M| Bus-Id Disp.A | Volatile Uncorr. ECC |
| Fan Temp Perf Pwr:Usage/Cap| Memory-Usage | GPU-Util Compute M. |
| | | MIG M. |
|===============================+======================+======================|
| 0 Tesla T4 Off | 00000000:00:1E.0 Off | 0 |
| N/A 38C P0 27W / 70W | 8037MiB / 15109MiB | 0% Default |
| | | N/A |
+-------------------------------+----------------------+----------------------+
+-----------------------------------------------------------------------------+
| Processes: |
| GPU GI CI PID Type Process name GPU Memory |
| ID ID Usage |
|=============================================================================|
+-----------------------------------------------------------------------------+
1. Load data#
[16]:
%%time
# download data
#!if [ ! -f conn.log ]; then \
# curl https://www.secrepo.com/maccdc2012/conn.log.gz | gzip -d > conn.log; \
#fi
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 523M 100 523M 0 0 31.5M 0 0:00:16 0:00:16 --:--:-- 30.0M
CPU times: user 240 ms, sys: 105 ms, total: 345 ms
Wall time: 17.3 s
[4]:
!head -n 3 conn.log
1331901000.000000 CCUIP21wTjqkj8ZqX5 192.168.202.79 50463 192.168.229.251 80 tcp - - - - SH - 0 Fa 1 52 1 52 (empty)
1331901000.000000 Csssjd3tX0yOTPDpng 192.168.202.79 46117 192.168.229.254 443 tcp - - - - SF - 0 dDafFr 3 382 9 994 (empty)
1331901000.000000 CHEt7z3AzG4gyCNgci 192.168.202.79 50465 192.168.229.251 80 tcp http 0.010000 166 214 SF - 0 ShADfFa 4 382 3 382 (empty)
[5]:
# OPTIONAL: For slow or limited devices, work on a subset:
LIMIT = 12000000
[6]:
%%time
cdf = cudf.read_csv("./conn.log", sep="\t", header=None,
names=["time", "uid", "id.orig_h", "id.orig_p", "id.resp_h", "id.resp_p", "proto", "service",
"duration", "orig_bytes", "resp_bytes", "conn_state", "local_orig", "missed_bytes",
"history", "orig_pkts", "orig_ip_bytes", "resp_pkts", "resp_ip_bytes", "tunnel_parents"],
dtype=['date', 'str', 'str', 'int', 'str', 'int', 'str', 'str',
'int', 'int', 'int', 'str', 'str', 'int',
'str', 'int', 'int', 'int', 'int', 'str'],
na_values=['-'], index_col=False, nrows=LIMIT)
CPU times: user 2.02 s, sys: 676 ms, total: 2.69 s
Wall time: 2.69 s
[7]:
#fillna
for c in cdf.columns:
if c in ['uid', 'id.orig_h', 'id.resp_h', 'proto', 'service', 'conn_state', 'history', 'tunnel_parents', 'local_orig']:
continue
cdf[c] = cdf[c].fillna(0)
[8]:
print('# rows', len(cdf))
cdf.head(3)
# rows 12000000
[8]:
time | uid | id.orig_h | id.orig_p | id.resp_h | id.resp_p | proto | service | duration | orig_bytes | resp_bytes | conn_state | local_orig | missed_bytes | history | orig_pkts | orig_ip_bytes | resp_pkts | resp_ip_bytes | tunnel_parents | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 1753-11-29 22:43:41.128654848 | CCUIP21wTjqkj8ZqX5 | 192.168.202.79 | 50463 | 192.168.229.251 | 80 | tcp | <NA> | 0 | 0 | 0 | SH | <NA> | 0 | Fa | 1 | 52 | 1 | 52 | (empty) |
1 | 1753-09-09 22:43:41.128654848 | Csssjd3tX0yOTPDpng | 192.168.202.79 | 46117 | 192.168.229.254 | 443 | tcp | <NA> | 0 | 0 | 0 | SF | <NA> | 0 | dDafFr | 3 | 382 | 9 | 994 | (empty) |
2 | 1753-10-06 22:43:41.128654848 | CHEt7z3AzG4gyCNgci | 192.168.202.79 | 50465 | 192.168.229.251 | 80 | tcp | http | 0 | 166 | 214 | SF | <NA> | 0 | ShADfFa | 4 | 382 | 3 | 382 | (empty) |
2. Analyze Data#
Summarize network activities between every communicating src/dst IP, split by connection state
RAPIDS currently fails when exceeding GPU memory, so limit workload size as needed
[9]:
%%time
cdf_summary=(cdf
.pipe(lambda df: df.assign(sum_bytes=df.orig_bytes + df.resp_bytes))
.groupby(['id.orig_h', 'id.resp_h', 'conn_state'])
.agg({
'time': ['min', 'max', 'count'],
'id.resp_p': ['count'],
'uid': ['count'],
'duration': ['min', 'max', 'mean'],
'orig_bytes': ['min', 'max', 'sum', 'mean'],
'resp_bytes': ['min', 'max', 'sum', 'mean'],
'sum_bytes': ['min', 'max', 'sum', 'mean']
}))
CPU times: user 1.19 s, sys: 68.2 ms, total: 1.26 s
Wall time: 1.26 s
[10]:
print('# rows', len(cdf_summary))
cdf_summary.head(3).to_pandas()
# rows 50140
[10]:
time | id.resp_p | uid | duration | orig_bytes | resp_bytes | sum_bytes | ||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
min | max | count | count | count | min | max | mean | min | max | sum | mean | min | max | sum | mean | min | max | sum | mean | |||
id.orig_h | id.resp_h | conn_state | ||||||||||||||||||||
0.0.0.0 | 255.255.255.255 | S0 | 1753-12-19 22:43:41.128654848 | 1774-05-02 22:43:41.128654848 | 87 | 87 | 87 | 0 | 142 | 17.620690 | 0 | 9323 | 74099 | 851.712644 | 0 | 0 | 0 | 0.0 | 0 | 9323 | 74099 | 851.712644 |
10.10.10.10 | 10.255.255.255 | S0 | 1755-03-06 22:43:41.128654848 | 1769-12-03 22:43:41.128654848 | 27 | 27 | 27 | 0 | 52 | 5.296296 | 0 | 1062 | 4558 | 168.814815 | 0 | 0 | 0 | 0.0 | 0 | 1062 | 4558 | 168.814815 |
192.168.202.78 | OTH | 1753-11-09 22:43:41.128654848 | 1773-12-17 22:43:41.128654848 | 34 | 34 | 34 | 0 | 95 | 10.352941 | 0 | 0 | 0 | 0.000000 | 0 | 0 | 0 | 0.0 | 0 | 0 | 0 | 0.000000 |
3. Visualize data#
Nodes:
IPs
Bigger when more sessions (split by connection state) involving them
Edges:
src_ip -> dest_ip, split by connection state
[13]:
# flatten multi-index
cdfs2 = cdf_summary.copy(deep=False).reset_index()
cdfs2.columns = [''.join(c) for c in cdfs2.columns]
cdfs2.columns
[13]:
Index(['id.orig_h', 'id.resp_h', 'conn_state', 'timemin', 'timemax',
'timecount', 'id.resp_pcount', 'uidcount', 'durationmin', 'durationmax',
'durationmean', 'orig_bytesmin', 'orig_bytesmax', 'orig_bytessum',
'orig_bytesmean', 'resp_bytesmin', 'resp_bytesmax', 'resp_bytessum',
'resp_bytesmean', 'sum_bytesmin', 'sum_bytesmax', 'sum_bytessum',
'sum_bytesmean'],
dtype='object')
[14]:
hg = graphistry.hypergraph(
cdfs2,
['id.orig_h', 'id.resp_h'],
direct=True,
opts={
'CATEGORIES': {
'ip': ['id.orig_h', 'id.resp_h']
}
}, engine='cudf') # or use default of 'pandas' with cdfs2.to_pandas() above
# links 50140
# events 50140
# attrib entities 5048
[15]:
hg['graph'].plot()
[15]:
Next Steps#
Part I: CPU Baseline in Python Pandas
Part III: GPU SQL - deprecated as Dask-SQL replaced BlazingSQL in the RAPIDS ecosystem
[ ]: