AlienVault USM event visualizer#
Search
Graphistry helper
Plot
Plug in your credentials in the first cells, put in a custom search, then plot.
[ ]:
!pip install requests_oauthlib -q
[ ]:
from unimatrix import UnimatrixClient
import graphistry
# To specify Graphistry account & server, use:
# graphistry.register(api=3, username='...', password='...', protocol='https', server='hub.graphistry.com')
# For more options, see https://github.com/graphistry/pygraphistry#configure
client = UnimatrixClient("my_subdomain", "client_id", "client_secret")
Search#
[ ]:
params = {'plugin': 'AlienVault Agent - Windows EventLog', 'event_name':'Network connection'}
events_df = client.get_events_df(num_items=25000, params=params)
print('# rows', len(events_df))
events_df.sample(3)
Helper#
[ ]:
def graph(events_df):
red_events = events_df
red_events = red_events.assign(source_process= red_events['source_process'].apply(lambda v: v.split('\\')[-1]),
destination_organisation=red_events.destination_organisation.fillna('INTERNAL'))
red_events = red_events.assign(source_process_id=red_events.apply(lambda row: str(row['source_process_id']) + '::' + str(row['source_canonical']), axis=1)).reset_index()
edges = {
'source_username': ['source_canonical', 'source_process_id', 'source_process_id'],
'source_process_id': ['source_process', 'iocs', 'event_category', 'watchlist'],
'source_canonical': ['source_address', 'source_process', 'source_name', 'source_process_id',
'destination_name', 'destination_canonical', 'destination_address'],
'destination_canonical': ['destination_name', 'destination_address']
}
node_types = list(edges.keys())
for dsts in edges.values():
node_types.extend(dsts)
node_types = list(set(node_types))
g = graphistry.hypergraph(
red_events.astype(str),
entity_types=node_types,
direct=True,
opts={
'EDGES': edges,
'CATEGORIES': {
'asset': ['source_canonical', 'source_address', 'source_canonical', 'destination_name', 'destination_canonical', 'destination_address' ]
}
})['graph']
types = list(g._nodes['category'].unique())
type_to_color = {t: types.index(t) % 10 for t in types}
events = list(g._edges['event_category'].unique())
event_to_color = {t: events.index(t) % 10 for t in events}
g = g.nodes(g._nodes.assign(p_color=g._nodes['category'].apply(lambda t: type_to_color[t]))).bind(point_color='p_color')
g = g.edges(g._edges[~(g._edges.dst.str.match('.*::nan') | g._edges.src.str.match('.*::nan'))])
g = g.edges(g._edges.assign(e_color=g._edges['event_category'].apply(lambda t: event_to_color[t]))).bind(edge_color='e_color')
return g
Visualize#
[ ]:
graph(events_df).plot()
[ ]: