graphistry.compute package#
Subpackages#
Submodules#
graphistry.compute.ASTSerializable module#
- class graphistry.compute.ASTSerializable.ASTSerializable#
Bases:
ABCInternal, not intended for use outside of this module. Class name becomes o[‘type’], and all non reserved_fields become JSON-typed key
- classmethod from_json(d, validate=True)#
Given c.to_json(), hydrate back c
- Args:
d: Dictionary from to_json() validate: If True (default), validate after parsing
- Returns:
Hydrated AST object
- Raises:
GFQLValidationError: If validate=True and validation fails
- Parameters:
d (Dict[str, None | bool | str | float | int | List[None | bool | str | float | int | List[JSONVal] | Dict[str, JSONVal]] | Dict[str, None | bool | str | float | int | List[JSONVal] | Dict[str, JSONVal]]])
validate (bool)
- Return type:
- reserved_fields = ['type']#
- to_json(validate=True)#
Returns JSON-compatible dictionry {“type”: “ClassName”, “arg1”: val1, …} Emits all non-reserved instance fields
- Return type:
Dict[str, None | bool | str | float | int | List[None | bool | str | float | int | List[JSONVal] | Dict[str, JSONVal]] | Dict[str, None | bool | str | float | int | List[JSONVal] | Dict[str, JSONVal]]]
- validate(collect_all=False)#
Validate this AST node.
- Args:
- collect_all: If True, collect all errors instead of raising on first.
If False (default), raise on first error.
- Returns:
If collect_all=True: List of validation errors (empty if valid) If collect_all=False: None if valid
- Raises:
GFQLValidationError: If collect_all=False and validation fails
- Parameters:
collect_all (bool)
- Return type:
List[GFQLValidationError] | None
graphistry.compute.ComputeMixin module#
- class graphistry.compute.ComputeMixin.ComputeMixin(*a, **kw)#
Bases:
Plottable- chain(*args, **kwargs)#
Chain a list of ASTObject (node/edge) traversal operations
Return subgraph of matches according to the list of node & edge matchers If any matchers are named, add a correspondingly named boolean-valued column to the output
For direct calls, exposes convenience List[ASTObject]. Internal operational should prefer Chain.
Use engine=’cudf’ to force automatic GPU acceleration mode
- Parameters:
ops – List[ASTObject] Various node and edge matchers
validate_schema – Whether to validate the chain against the graph schema before executing
- Returns:
Plotter
- Return type:
Plotter
Example: Find nodes of some type
from graphistry.ast import n people_nodes_df = g.chain([ n({"type": "person"}) ])._nodes
Example: Find 2-hop edge sequences with some attribute
from graphistry.ast import e_forward g_2_hops = g.chain([ e_forward({"interesting": True}, hops=2) ]) g_2_hops.plot()
Example: Find any node 1-2 hops out from another node, and label each hop
from graphistry.ast import n, e_undirected g_2_hops = g.chain([ n({g._node: "a"}), e_undirected(name="hop1"), e_undirected(name="hop2") ]) print('# first-hop edges:', len(g_2_hops._edges[ g_2_hops._edges.hop1 == True ]))
Example: Transaction nodes between two kinds of risky nodes
from graphistry.ast import n, e_forward, e_reverse g_risky = g.chain([ n({"risk1": True}), e_forward(to_fixed=True), n({"type": "transaction"}, name="hit"), e_reverse(to_fixed=True), n({"risk2": True}) ]) print('# hits:', len(g_risky._nodes[ g_risky._nodes.hit ]))
Example: Filter by multiple node types at each step using is_in
from graphistry.ast import n, e_forward, e_reverse, is_in g_risky = g.chain([ n({"type": is_in(["person", "company"])}), e_forward({"e_type": is_in(["owns", "reviews"])}, to_fixed=True), n({"type": is_in(["transaction", "account"])}, name="hit"), e_reverse(to_fixed=True), n({"risk2": True}) ]) print('# hits:', len(g_risky._nodes[ g_risky._nodes.hit ]))
Example: Run with automatic GPU acceleration
import cudf import graphistry e_gdf = cudf.from_pandas(df) g1 = graphistry.edges(e_gdf, 's', 'd') g2 = g1.chain([ ... ])
Example: Run with automatic GPU acceleration, and force GPU mode
import cudf import graphistry e_gdf = cudf.from_pandas(df) g1 = graphistry.edges(e_gdf, 's', 'd') g2 = g1.chain([ ... ], engine='cudf')
- chain_remote(*args, **kwargs)#
Remotely run GFQL chain query on a remote dataset.
Uses the latest bound _dataset_id, and uploads current dataset if not already bound. Note that rebinding calls of edges() and nodes() reset the _dataset_id binding.
- Parameters:
chain (Union[Chain, List[ASTObject], Dict[str, JSONVal]]) – GFQL chain query as a Python object or in serialized JSON format
api_token (Optional[str]) – Optional JWT token. If not provided, refreshes JWT and uses that.
dataset_id (Optional[str]) – Optional dataset_id. If not provided, will fallback to self._dataset_id. If not provided, will upload current data, store that dataset_id, and run GFQL against that.
output_type (OutputType) – Whether to return nodes and edges (“all”, default), Plottable with just nodes (“nodes”), or Plottable with just edges (“edges”). For just a dataframe of the resultant graph shape (output_type=”shape”), use instead chain_remote_shape().
format (Optional[FormatType]) – What format to fetch results. We recommend a columnar format such as parquet, which it defaults to when output_type is not shape.
df_export_args (Optional[Dict, str, Any]]) – When server parses data, any additional parameters to pass in.
node_col_subset (Optional[List[str]]) – When server returns nodes, what property subset to return. Defaults to all.
edge_col_subset (Optional[List[str]]) – When server returns edges, what property subset to return. Defaults to all.
engine (Optional[Literal["pandas", "cudf]]) – Override which run mode GFQL uses. By default, inspects graph size to decide.
validate (bool) – Whether to locally test code, and if uploading data, the data. Default true.
- Return type:
- Example: Explicitly upload graph and return subgraph where nodes have at least one edge
import graphistry from graphistry import n, e es = pandas.DataFrame({'src': [0,1,2], 'dst': [1,2,0]}) g1 = graphistry.edges(es, 'src', 'dst').upload() assert g1._dataset_id, "Graph should have uploaded" g2 = g1.chain_remote([n(), e(), n()]) print(f'dataset id: {g2._dataset_id}, # nodes: {len(g2._nodes)}')
- Example: Return subgraph where nodes have at least one edge, with implicit upload
import graphistry from graphistry import n, e es = pandas.DataFrame({'src': [0,1,2], 'dst': [1,2,0]}) g1 = graphistry.edges(es, 'src', 'dst') g2 = g1.chain_remote([n(), e(), n()]) print(f'dataset id: {g2._dataset_id}, # nodes: {len(g2._nodes)}')
- Example: Return subgraph where nodes have at least one edge, with implicit upload, and force GPU mode
import graphistry from graphistry import n, e es = pandas.DataFrame({'src': [0,1,2], 'dst': [1,2,0]}) g1 = graphistry.edges(es, 'src', 'dst') g2 = g1.chain_remote([n(), e(), n()], engine='cudf') print(f'dataset id: {g2._dataset_id}, # nodes: {len(g2._nodes)}')
- chain_remote_shape(*args, **kwargs)#
Like chain_remote(), except instead of returning a Plottable, returns a pd.DataFrame of the shape of the resulting graph.
Useful as a fast success indicator that avoids the need to return a full graph when a match finds hits, return just the metadata.
- Example: Upload graph and compute number of nodes with at least one edge
import graphistry es = pandas.DataFrame({'src': [0,1,2], 'dst': [1,2,0]}) g1 = graphistry.edges(es, 'src', 'dst').upload() assert g1._dataset_id, "Graph should have uploaded" shape_df = g1.chain_remote_shape([n(), e(), n()]) print(shape_df)
- Example: Compute number of nodes with at least one edge, with implicit upload, and force GPU mode
import graphistry es = pandas.DataFrame({'src': [0,1,2], 'dst': [1,2,0]}) g1 = graphistry.edges(es, 'src', 'dst') shape_df = g1.chain_remote_shape([n(), e(), n()], engine='cudf') print(shape_df)
- Return type:
DataFrame
- collapse(node, attribute, column, self_edges=False, unwrap=False, verbose=False)#
Topology-aware collapse by given column attribute starting at node
Traverses directed graph from start node node and collapses clusters of nodes that share the same property so that topology is preserved.
- Parameters:
node (str | int) – start node to begin traversal
attribute (str | int) – the given attribute to collapse over within column
column (str | int) – the column of nodes DataFrame that contains attribute to collapse over
self_edges (bool) – whether to include self edges in the collapsed graph
unwrap (bool) – whether to unwrap the collapsed graph into a single node
verbose (bool) – whether to print out collapse summary information
:returns:A new Graphistry instance with nodes and edges DataFrame containing collapsed nodes and edges given by column attribute – nodes and edges DataFrames contain six new columns collapse_{node | edges} and final_{node | edges}, while original (node, src, dst) columns are left untouched :rtype: Plottable
- drop_nodes(nodes)#
return g with any nodes/edges involving the node id series removed
- filter_edges_by_dict(*args, **kwargs)#
filter edges to those that match all values in filter_dict
- filter_nodes_by_dict(*args, **kwargs)#
filter nodes to those that match all values in filter_dict
- get_degrees(col='degree', degree_in='degree_in', degree_out='degree_out')#
Decorate nodes table with degree info
Edges must be dataframe-like: pandas, cudf, …
Parameters determine generated column names
Warning: Self-cycles are currently double-counted. This may change.
Example: Generate degree columns
edges = pd.DataFrame({'s': ['a','b','c','d'], 'd': ['c','c','e','e']}) g = graphistry.edges(edges, 's', 'd') print(g._nodes) # None g2 = g.get_degrees() print(g2._nodes) # pd.DataFrame with 'id', 'degree', 'degree_in', 'degree_out'
- Parameters:
col (str)
degree_in (str)
degree_out (str)
- get_indegrees(col='degree_in')#
See get_degrees
- Parameters:
col (str)
- get_outdegrees(col='degree_out')#
See get_degrees
- Parameters:
col (str)
- get_topological_levels(level_col='level', allow_cycles=True, warn_cycles=True, remove_self_loops=True)#
Label nodes on column level_col based on topological sort depth Supports pandas + cudf, using parallelism within each level computation Options: * allow_cycles: if False and detects a cycle, throw ValueException, else break cycle by picking a lowest-in-degree node * warn_cycles: if True and detects a cycle, proceed with a warning * remove_self_loops: preprocess by removing self-cycles. Avoids allow_cycles=False, warn_cycles=True messages.
Example:
edges_df = gpd.DataFrame({‘s’: [‘a’, ‘b’, ‘c’, ‘d’],’d’: [‘b’, ‘c’, ‘e’, ‘e’]}) g = graphistry.edges(edges_df, ‘s’, ‘d’) g2 = g.get_topological_levels() g2._nodes.info() # pd.DataFrame with | ‘id’ , ‘level’ |
- Parameters:
level_col (str)
allow_cycles (bool)
warn_cycles (bool)
remove_self_loops (bool)
- Return type:
- hop(*args, **kwargs)#
Given a graph and some source nodes, return subgraph of all paths within k-hops from the sources
This can be faster than the equivalent chain([…]) call that wraps it with additional steps
See chain() examples for examples of many of the parameters
g: Plotter nodes: dataframe with id column matching g._node. None signifies all nodes (default). hops: consider paths of length 1 to ‘hops’ steps, if any (default 1). to_fixed_point: keep hopping until no new nodes are found (ignores hops) direction: ‘forward’, ‘reverse’, ‘undirected’ edge_match: dict of kv-pairs to exact match (see also: filter_edges_by_dict) source_node_match: dict of kv-pairs to match nodes before hopping (including intermediate) destination_node_match: dict of kv-pairs to match nodes after hopping (including intermediate) source_node_query: dataframe query to match nodes before hopping (including intermediate) destination_node_query: dataframe query to match nodes after hopping (including intermediate) edge_query: dataframe query to match edges before hopping (including intermediate) return_as_wave_front: Exclude starting node(s) in return, returning only encountered nodes target_wave_front: Only consider these nodes + self._nodes for reachability engine: ‘auto’, ‘pandas’, ‘cudf’ (GPU)
- keep_nodes(nodes)#
Limit nodes and edges to those selected by parameter nodes For edges, both source and destination must be in nodes Nodes can be a list or series of node IDs, or a dictionary When a dictionary, each key corresponds to a node column, and nodes will be included when all match
- materialize_nodes(reuse=True, engine=EngineAbstract.AUTO)#
Generate g._nodes based on g._edges
Uses g._node for node id if exists, else ‘id’
Edges must be dataframe-like: cudf, pandas, …
When reuse=True and g._nodes is not None, use it
Example: Generate nodes
edges = pd.DataFrame({'s': ['a','b','c','d'], 'd': ['c','c','e','e']}) g = graphistry.edges(edges, 's', 'd') print(g._nodes) # None g2 = g.materialize_nodes() print(g2._nodes) # pd.DataFrame
- Parameters:
reuse (bool)
engine (EngineAbstract | str)
- Return type:
- prune_self_edges()#
- python_remote_g(*args, **kwargs)#
Remotely run Python code on a remote dataset that returns a Plottable
Uses the latest bound _dataset_id, and uploads current dataset if not already bound. Note that rebinding calls of edges() and nodes() reset the _dataset_id binding.
- Parameters:
code (Union[str, Callable[..., object]]) – Python code that includes a top-level function def task(g: Plottable) -> Union[str, Dict].
api_token (Optional[str]) – Optional JWT token. If not provided, refreshes JWT and uses that.
dataset_id (Optional[str]) – Optional dataset_id. If not provided, will fallback to self._dataset_id. If not defined, will upload current data, store that dataset_id, and run code against that.
format (Optional[FormatType]) – What format to fetch results. Defaults to ‘parquet’.
output_type (Optional[OutputTypeGraph]) – What shape of output to fetch. Defaults to ‘all’. Options include ‘nodes’, ‘edges’, ‘all’ (both). For other variants, see python_remote_shape and python_remote_json.
engine (Literal["pandas", "cudf]) – Override which run mode GFQL uses. Defaults to “cudf”.
run_label (Optional[str]) – Optional label for the run for serverside job tracking.
validate (bool) – Whether to locally test code, and if uploading data, the data. Default true.
- Return type:
Any
- Example: Upload data and count the results
import graphistry from graphistry import n, e es = pandas.DataFrame({'src': [0,1,2], 'dst': [1,2,0]}) g1 = graphistry .edges(es, source='src', destination='dst') .upload() assert g1._dataset_id is not None, "Successfully uploaded" g2 = g1.python_remote_g( code=''' from typing import Any, Dict from graphistry import Plottable def task(g: Plottable) -> Dict[str, Any]: return g ''', engine='cudf') num_edges = len(g2._edges) print(f'num_edges: {num_edges}')
- python_remote_json(*args, **kwargs)#
Remotely run Python code on a remote dataset that returns json
Uses the latest bound _dataset_id, and uploads current dataset if not already bound. Note that rebinding calls of edges() and nodes() reset the _dataset_id binding.
- Parameters:
code (Union[str, Callable[..., object]]) – Python code that includes a top-level function def task(g: Plottable) -> Union[str, Dict].
api_token (Optional[str]) – Optional JWT token. If not provided, refreshes JWT and uses that.
dataset_id (Optional[str]) – Optional dataset_id. If not provided, will fallback to self._dataset_id. If not defined, will upload current data, store that dataset_id, and run code against that.
engine (Literal["pandas", "cudf]) – Override which run mode GFQL uses. Defaults to “cudf”.
run_label (Optional[str]) – Optional label for the run for serverside job tracking.
validate (bool) – Whether to locally test code, and if uploading data, the data. Default true.
- Return type:
Any
- Example: Upload data and count the results
import graphistry from graphistry import n, e es = pandas.DataFrame({'src': [0,1,2], 'dst': [1,2,0]}) g1 = graphistry .edges(es, source='src', destination='dst') .upload() assert g1._dataset_id is not None, "Successfully uploaded" obj = g1.python_remote_json( code=''' from typing import Any, Dict from graphistry import Plottable def task(g: Plottable) -> Dict[str, Any]: return {'num_edges': len(g._edges)} ''', engine='cudf') num_edges = obj['num_edges'] print(f'num_edges: {num_edges}')
- python_remote_table(*args, **kwargs)#
Remotely run Python code on a remote dataset that returns a table
Uses the latest bound _dataset_id, and uploads current dataset if not already bound. Note that rebinding calls of edges() and nodes() reset the _dataset_id binding.
- Parameters:
code (Union[str, Callable[..., object]]) – Python code that includes a top-level function def task(g: Plottable) -> Union[str, Dict].
api_token (Optional[str]) – Optional JWT token. If not provided, refreshes JWT and uses that.
dataset_id (Optional[str]) – Optional dataset_id. If not provided, will fallback to self._dataset_id. If not defined, will upload current data, store that dataset_id, and run code against that.
format (Optional[FormatType]) – What format to fetch results. Defaults to ‘parquet’.
output_type (Optional[OutputTypeGraph]) – What shape of output to fetch. Defaults to ‘table’. Options include ‘table’, ‘nodes’, and ‘edges’.
engine (Literal["pandas", "cudf]) – Override which run mode GFQL uses. Defaults to “cudf”.
run_label (Optional[str]) – Optional label for the run for serverside job tracking.
validate (bool) – Whether to locally test code, and if uploading data, the data. Default true.
- Return type:
Any
- Example: Upload data and count the results
import graphistry from graphistry import n, e es = pandas.DataFrame({'src': [0,1,2], 'dst': [1,2,0]}) g1 = graphistry .edges(es, source='src', destination='dst') .upload() assert g1._dataset_id is not None, "Successfully uploaded" edges_df = g1.python_remote_table( code=''' from typing import Any, Dict from graphistry import Plottable def task(g: Plottable) -> Dict[str, Any]: return g._edges ''', engine='cudf') num_edges = len(edges_df) print(f'num_edges: {num_edges}')
- to_cudf()#
Convert to GPU mode by converting any defined nodes and edges to cudf dataframes
When nodes or edges are already cudf dataframes, they are left as is
graphistry.compute.ast module#
- class graphistry.compute.ast.ASTEdge(direction='forward', edge_match=None, hops=1, to_fixed_point=False, source_node_match=None, destination_node_match=None, source_node_query=None, destination_node_query=None, edge_query=None, name=None)#
Bases:
ASTObjectInternal, not intended for use outside of this module.
- Parameters:
direction (Literal['forward', 'reverse', 'undirected'])
edge_match (dict | None)
hops (int | None)
to_fixed_point (bool)
source_node_match (dict | None)
destination_node_match (dict | None)
source_node_query (str | None)
destination_node_query (str | None)
edge_query (str | None)
name (str | None)
- direction: Literal['forward', 'reverse', 'undirected']#
- classmethod from_json(d, validate=True)#
Given c.to_json(), hydrate back c
- Args:
d: Dictionary from to_json() validate: If True (default), validate after parsing
- Returns:
Hydrated AST object
- Raises:
GFQLValidationError: If validate=True and validation fails
- Parameters:
d (dict)
validate (bool)
- Return type:
- to_json(validate=True)#
Returns JSON-compatible dictionry {“type”: “ClassName”, “arg1”: val1, …} Emits all non-reserved instance fields
- Return type:
dict
- class graphistry.compute.ast.ASTEdgeForward(edge_match=None, hops=1, source_node_match=None, destination_node_match=None, to_fixed_point=False, name=None, source_node_query=None, destination_node_query=None, edge_query=None)#
Bases:
ASTEdgeInternal, not intended for use outside of this module.
- Parameters:
edge_match (dict | None)
hops (int | None)
source_node_match (dict | None)
destination_node_match (dict | None)
to_fixed_point (bool)
name (str | None)
source_node_query (str | None)
destination_node_query (str | None)
edge_query (str | None)
- direction: Literal['forward', 'reverse', 'undirected']#
- classmethod from_json(d, validate=True)#
Given c.to_json(), hydrate back c
- Args:
d: Dictionary from to_json() validate: If True (default), validate after parsing
- Returns:
Hydrated AST object
- Raises:
GFQLValidationError: If validate=True and validation fails
- Parameters:
d (dict)
validate (bool)
- Return type:
- class graphistry.compute.ast.ASTEdgeReverse(edge_match=None, hops=1, source_node_match=None, destination_node_match=None, to_fixed_point=False, name=None, source_node_query=None, destination_node_query=None, edge_query=None)#
Bases:
ASTEdgeInternal, not intended for use outside of this module.
- Parameters:
edge_match (dict | None)
hops (int | None)
source_node_match (dict | None)
destination_node_match (dict | None)
to_fixed_point (bool)
name (str | None)
source_node_query (str | None)
destination_node_query (str | None)
edge_query (str | None)
- direction: Literal['forward', 'reverse', 'undirected']#
- classmethod from_json(d, validate=True)#
Given c.to_json(), hydrate back c
- Args:
d: Dictionary from to_json() validate: If True (default), validate after parsing
- Returns:
Hydrated AST object
- Raises:
GFQLValidationError: If validate=True and validation fails
- Parameters:
d (dict)
validate (bool)
- Return type:
- class graphistry.compute.ast.ASTEdgeUndirected(edge_match=None, hops=1, source_node_match=None, destination_node_match=None, to_fixed_point=False, name=None, source_node_query=None, destination_node_query=None, edge_query=None)#
Bases:
ASTEdgeInternal, not intended for use outside of this module.
- Parameters:
edge_match (dict | None)
hops (int | None)
source_node_match (dict | None)
destination_node_match (dict | None)
to_fixed_point (bool)
name (str | None)
source_node_query (str | None)
destination_node_query (str | None)
edge_query (str | None)
- direction: Literal['forward', 'reverse', 'undirected']#
- classmethod from_json(d, validate=True)#
Given c.to_json(), hydrate back c
- Args:
d: Dictionary from to_json() validate: If True (default), validate after parsing
- Returns:
Hydrated AST object
- Raises:
GFQLValidationError: If validate=True and validation fails
- Parameters:
d (dict)
validate (bool)
- Return type:
- class graphistry.compute.ast.ASTNode(filter_dict=None, name=None, query=None)#
Bases:
ASTObjectInternal, not intended for use outside of this module.
- Parameters:
filter_dict (dict | None)
name (str | None)
query (str | None)
- classmethod from_json(d, validate=True)#
Given c.to_json(), hydrate back c
- Args:
d: Dictionary from to_json() validate: If True (default), validate after parsing
- Returns:
Hydrated AST object
- Raises:
GFQLValidationError: If validate=True and validation fails
- Parameters:
d (dict)
validate (bool)
- Return type:
- to_json(validate=True)#
Returns JSON-compatible dictionry {“type”: “ClassName”, “arg1”: val1, …} Emits all non-reserved instance fields
- Return type:
dict
- class graphistry.compute.ast.ASTObject(name=None)#
Bases:
ASTSerializableInternal, not intended for use outside of this module. These are operator-level expressions used as g.chain(List<ASTObject>)
- Parameters:
name (str | None)
- graphistry.compute.ast.assert_record_match(d)#
- Parameters:
d (Dict)
- Return type:
None
- graphistry.compute.ast.e#
alias of
ASTEdgeUndirected
- graphistry.compute.ast.e_forward#
alias of
ASTEdgeForward
- graphistry.compute.ast.e_reverse#
alias of
ASTEdgeReverse
- graphistry.compute.ast.e_undirected#
alias of
ASTEdgeUndirected
- graphistry.compute.ast.from_json(o, validate=True)#
- graphistry.compute.ast.maybe_filter_dict_from_json(d, key)#
- Parameters:
d (Dict)
key (str)
- Return type:
Dict | None
graphistry.compute.ast_temporal module#
- class graphistry.compute.ast_temporal.DateTimeValue(value, timezone='UTC')#
Bases:
TemporalValueTagged datetime value with timezone support
- Parameters:
value (str)
timezone (str)
- as_pandas_value()#
Convert to pandas-compatible value for comparison
- Return type:
Timestamp
- classmethod from_datetime(dt)#
Create from Python datetime
- Parameters:
dt (datetime)
- Return type:
- classmethod from_pandas_timestamp(ts)#
Create from pandas Timestamp
- Parameters:
ts (Timestamp)
- Return type:
- to_json()#
Return dict for tagged temporal value
- Return type:
DateTimeWire
- class graphistry.compute.ast_temporal.DateValue(value)#
Bases:
TemporalValueTagged date value
- Parameters:
value (str)
- as_pandas_value()#
Convert to pandas-compatible value for comparison
- Return type:
Timestamp
- to_json()#
Return dict for tagged temporal value
- Return type:
DateWire
- class graphistry.compute.ast_temporal.TemporalValue#
Bases:
ABCBase class for temporal values with tagging support
- abstract as_pandas_value()#
Convert to pandas-compatible value for comparison
- Return type:
Any
- abstract to_json()#
Serialize to JSON-compatible dictionary
- Return type:
DateTimeWire | DateWire | TimeWire
- class graphistry.compute.ast_temporal.TimeValue(value)#
Bases:
TemporalValueTagged time value
- Parameters:
value (str)
- as_pandas_value()#
Convert to pandas-compatible value for comparison
- Return type:
time
- to_json()#
Return dict for tagged temporal value
- Return type:
TimeWire
- graphistry.compute.ast_temporal.temporal_value_from_json(d)#
Factory function to create temporal value from JSON dict
- Parameters:
d (Dict[str, Any])
- Return type:
graphistry.compute.chain module#
- class graphistry.compute.chain.Chain(chain)#
Bases:
ASTSerializable- Parameters:
chain (List[ASTObject])
- classmethod from_json(d, validate=True)#
Convert a JSON AST into a list of ASTObjects
- Parameters:
d (Dict[str, None | bool | str | float | int | List[None | bool | str | float | int | List[JSONVal] | Dict[str, JSONVal]] | Dict[str, None | bool | str | float | int | List[JSONVal] | Dict[str, JSONVal]]])
validate (bool)
- Return type:
- to_json(validate=True)#
Convert a list of ASTObjects into a JSON AST
- Return type:
Dict[str, None | bool | str | float | int | List[None | bool | str | float | int | List[JSONVal] | Dict[str, JSONVal]] | Dict[str, None | bool | str | float | int | List[JSONVal] | Dict[str, JSONVal]]]
- validate(collect_all=False)#
Override to collect all chain validation errors.
- Parameters:
collect_all (bool)
- Return type:
List[GFQLValidationError] | None
- validate_schema(g, collect_all=False)#
Validate this chain against a graph’s schema without executing.
- Args:
g: Graph to validate against collect_all: If True, collect all errors. If False, raise on first.
- Returns:
If collect_all=True: List of errors (empty if valid) If collect_all=False: None if valid
- Raises:
GFQLSchemaError: If collect_all=False and validation fails
- Parameters:
g (Plottable)
collect_all (bool)
- Return type:
List[GFQLSchemaError] | None
- graphistry.compute.chain.chain(self, ops, engine=EngineAbstract.AUTO, validate_schema=True)#
Chain a list of ASTObject (node/edge) traversal operations
Return subgraph of matches according to the list of node & edge matchers If any matchers are named, add a correspondingly named boolean-valued column to the output
For direct calls, exposes convenience List[ASTObject]. Internal operational should prefer Chain.
Use engine=’cudf’ to force automatic GPU acceleration mode
- Parameters:
- Returns:
Plotter
- Return type:
Plotter
Example: Find nodes of some type
from graphistry.ast import n people_nodes_df = g.chain([ n({"type": "person"}) ])._nodes
Example: Find 2-hop edge sequences with some attribute
from graphistry.ast import e_forward g_2_hops = g.chain([ e_forward({"interesting": True}, hops=2) ]) g_2_hops.plot()
Example: Find any node 1-2 hops out from another node, and label each hop
from graphistry.ast import n, e_undirected g_2_hops = g.chain([ n({g._node: "a"}), e_undirected(name="hop1"), e_undirected(name="hop2") ]) print('# first-hop edges:', len(g_2_hops._edges[ g_2_hops._edges.hop1 == True ]))
Example: Transaction nodes between two kinds of risky nodes
from graphistry.ast import n, e_forward, e_reverse g_risky = g.chain([ n({"risk1": True}), e_forward(to_fixed=True), n({"type": "transaction"}, name="hit"), e_reverse(to_fixed=True), n({"risk2": True}) ]) print('# hits:', len(g_risky._nodes[ g_risky._nodes.hit ]))
Example: Filter by multiple node types at each step using is_in
from graphistry.ast import n, e_forward, e_reverse, is_in g_risky = g.chain([ n({"type": is_in(["person", "company"])}), e_forward({"e_type": is_in(["owns", "reviews"])}, to_fixed=True), n({"type": is_in(["transaction", "account"])}, name="hit"), e_reverse(to_fixed=True), n({"risk2": True}) ]) print('# hits:', len(g_risky._nodes[ g_risky._nodes.hit ]))
Example: Run with automatic GPU acceleration
import cudf import graphistry e_gdf = cudf.from_pandas(df) g1 = graphistry.edges(e_gdf, 's', 'd') g2 = g1.chain([ ... ])
Example: Run with automatic GPU acceleration, and force GPU mode
import cudf import graphistry e_gdf = cudf.from_pandas(df) g1 = graphistry.edges(e_gdf, 's', 'd') g2 = g1.chain([ ... ], engine='cudf')
graphistry.compute.chain_remote module#
- graphistry.compute.chain_remote.chain_remote(self, chain, api_token=None, dataset_id=None, output_type='all', format=None, df_export_args=None, node_col_subset=None, edge_col_subset=None, engine=None, validate=True)#
Remotely run GFQL chain query on a remote dataset.
Uses the latest bound _dataset_id, and uploads current dataset if not already bound. Note that rebinding calls of edges() and nodes() reset the _dataset_id binding.
- Parameters:
chain (Union[Chain, List[ASTObject], Dict[str, JSONVal]]) – GFQL chain query as a Python object or in serialized JSON format
api_token (Optional[str]) – Optional JWT token. If not provided, refreshes JWT and uses that.
dataset_id (Optional[str]) – Optional dataset_id. If not provided, will fallback to self._dataset_id. If not provided, will upload current data, store that dataset_id, and run GFQL against that.
output_type (OutputType) – Whether to return nodes and edges (“all”, default), Plottable with just nodes (“nodes”), or Plottable with just edges (“edges”). For just a dataframe of the resultant graph shape (output_type=”shape”), use instead chain_remote_shape().
format (Optional[FormatType]) – What format to fetch results. We recommend a columnar format such as parquet, which it defaults to when output_type is not shape.
df_export_args (Optional[Dict, str, Any]]) – When server parses data, any additional parameters to pass in.
node_col_subset (Optional[List[str]]) – When server returns nodes, what property subset to return. Defaults to all.
edge_col_subset (Optional[List[str]]) – When server returns edges, what property subset to return. Defaults to all.
engine (Optional[Literal["pandas", "cudf]]) – Override which run mode GFQL uses. By default, inspects graph size to decide.
validate (bool) – Whether to locally test code, and if uploading data, the data. Default true.
self (Plottable)
- Return type:
- Example: Explicitly upload graph and return subgraph where nodes have at least one edge
import graphistry from graphistry import n, e es = pandas.DataFrame({'src': [0,1,2], 'dst': [1,2,0]}) g1 = graphistry.edges(es, 'src', 'dst').upload() assert g1._dataset_id, "Graph should have uploaded" g2 = g1.chain_remote([n(), e(), n()]) print(f'dataset id: {g2._dataset_id}, # nodes: {len(g2._nodes)}')
- Example: Return subgraph where nodes have at least one edge, with implicit upload
import graphistry from graphistry import n, e es = pandas.DataFrame({'src': [0,1,2], 'dst': [1,2,0]}) g1 = graphistry.edges(es, 'src', 'dst') g2 = g1.chain_remote([n(), e(), n()]) print(f'dataset id: {g2._dataset_id}, # nodes: {len(g2._nodes)}')
- Example: Return subgraph where nodes have at least one edge, with implicit upload, and force GPU mode
import graphistry from graphistry import n, e es = pandas.DataFrame({'src': [0,1,2], 'dst': [1,2,0]}) g1 = graphistry.edges(es, 'src', 'dst') g2 = g1.chain_remote([n(), e(), n()], engine='cudf') print(f'dataset id: {g2._dataset_id}, # nodes: {len(g2._nodes)}')
- graphistry.compute.chain_remote.chain_remote_generic(self, chain, api_token=None, dataset_id=None, output_type='all', format=None, df_export_args=None, node_col_subset=None, edge_col_subset=None, engine=None, validate=True)#
- Parameters:
self (Plottable)
chain (Chain | Dict[str, None | bool | str | float | int | List[None | bool | str | float | int | List[JSONVal] | Dict[str, JSONVal]] | Dict[str, None | bool | str | float | int | List[JSONVal] | Dict[str, JSONVal]]] | List[Any])
api_token (str | None)
dataset_id (str | None)
output_type (Literal['all', 'nodes', 'edges', 'shape'])
format (Literal['json', 'csv', 'parquet'] | None)
df_export_args (Dict[str, Any] | None)
node_col_subset (List[str] | None)
edge_col_subset (List[str] | None)
engine (Literal['pandas', 'cudf'] | None)
validate (bool)
- Return type:
Plottable | DataFrame
- graphistry.compute.chain_remote.chain_remote_shape(self, chain, api_token=None, dataset_id=None, format=None, df_export_args=None, node_col_subset=None, edge_col_subset=None, engine=None, validate=True)#
Like chain_remote(), except instead of returning a Plottable, returns a pd.DataFrame of the shape of the resulting graph.
Useful as a fast success indicator that avoids the need to return a full graph when a match finds hits, return just the metadata.
- Example: Upload graph and compute number of nodes with at least one edge
import graphistry es = pandas.DataFrame({'src': [0,1,2], 'dst': [1,2,0]}) g1 = graphistry.edges(es, 'src', 'dst').upload() assert g1._dataset_id, "Graph should have uploaded" shape_df = g1.chain_remote_shape([n(), e(), n()]) print(shape_df)
- Example: Compute number of nodes with at least one edge, with implicit upload, and force GPU mode
import graphistry es = pandas.DataFrame({'src': [0,1,2], 'dst': [1,2,0]}) g1 = graphistry.edges(es, 'src', 'dst') shape_df = g1.chain_remote_shape([n(), e(), n()], engine='cudf') print(shape_df)
- Parameters:
self (Plottable)
chain (Chain | List[ASTObject] | Dict[str, None | bool | str | float | int | List[None | bool | str | float | int | List[JSONVal] | Dict[str, JSONVal]] | Dict[str, None | bool | str | float | int | List[JSONVal] | Dict[str, JSONVal]]])
api_token (str | None)
dataset_id (str | None)
format (Literal['json', 'csv', 'parquet'] | None)
df_export_args (Dict[str, Any] | None)
node_col_subset (List[str] | None)
edge_col_subset (List[str] | None)
engine (Literal['pandas', 'cudf'] | None)
validate (bool)
- Return type:
DataFrame
graphistry.compute.chain_validate module#
graphistry.compute.cluster module#
- class graphistry.compute.cluster.ClusterMixin(*a, **kw)#
Bases:
object- dbscan(min_dist=0.2, min_samples=1, cols=None, kind='nodes', fit_umap_embedding=True, target=False, verbose=False, engine_dbscan='auto', *args, **kwargs)#
- DBSCAN clustering on cpu or gpu infered automatically. Adds a _dbscan column to nodes or edges.
NOTE: g.transform_dbscan(..) currently unsupported on GPU.
Saves model as g._dbscan_nodes or g._dbscan_edges
Examples:
g = graphistry.edges(edf, 'src', 'dst').nodes(ndf, 'node') # cluster by UMAP embeddings kind = 'nodes' | 'edges' g2 = g.umap(kind=kind).dbscan(kind=kind) print(g2._nodes['_dbscan']) | print(g2._edges['_dbscan']) # dbscan in umap or featurize API g2 = g.umap(dbscan=True, min_dist=1.2, min_samples=2, **kwargs) # or, here dbscan is infered from features, not umap embeddings g2 = g.featurize(dbscan=True, min_dist=1.2, min_samples=2, **kwargs) # and via chaining, g2 = g.umap().dbscan(min_dist=1.2, min_samples=2, **kwargs) # cluster by feature embeddings g2 = g.featurize().dbscan(**kwargs) # cluster by a given set of feature column attributes, or with target=True g2 = g.featurize().dbscan(cols=['ip_172', 'location', 'alert'], target=False, **kwargs) # equivalent to above (ie, cols != None and umap=True will still use features dataframe, rather than UMAP embeddings) g2 = g.umap().dbscan(cols=['ip_172', 'location', 'alert'], umap=True | False, **kwargs) g2.plot() # color by `_dbscan` column
- Useful:
Enriching the graph with cluster labels from UMAP is useful for visualizing clusters in the graph by color, size, etc, as well as assessing metrics per cluster, e.g. graphistry/pygraphistry
- Args:
- min_dist float:
The maximum distance between two samples for them to be considered as in the same neighborhood.
- kind str:
‘nodes’ or ‘edges’
- cols:
list of columns to use for clustering given g.featurize has been run, nice way to slice features or targets by fragments of interest, e.g. [‘ip_172’, ‘location’, ‘ssh’, ‘warnings’]
- fit_umap_embedding bool:
whether to use UMAP embeddings or features dataframe to cluster DBSCAN
- min_samples:
The number of samples in a neighborhood for a point to be considered as a core point. This includes the point itself.
- target:
whether to use the target column as the clustering feature
- Parameters:
min_dist (float)
min_samples (int)
cols (List | str | None)
kind (Literal['nodes', 'edges'])
fit_umap_embedding (bool)
target (bool)
verbose (bool)
engine_dbscan (Literal['cuml', 'sklearn', 'auto'])
- transform_dbscan(df, y=None, min_dist='auto', infer_umap_embedding=False, sample=None, n_neighbors=None, kind='nodes', return_graph=True, verbose=False)#
Transforms a minibatch dataframe to one with a new column ‘_dbscan’ containing the DBSCAN cluster labels on the minibatch and generates a graph with the minibatch and the original graph, with edges between the minibatch and the original graph inferred from the umap embedding or features dataframe. Graph nodes | edges will be colored by ‘_dbscan’ column.
Examples:
fit: g = graphistry.edges(edf, 'src', 'dst').nodes(ndf, 'node') g2 = g.featurize().dbscan() predict: :: emb, X, _, ndf = g2.transform_dbscan(ndf, return_graph=False) # or g3 = g2.transform_dbscan(ndf, return_graph=True) g3.plot()
likewise for umap:
fit: g = graphistry.edges(edf, 'src', 'dst').nodes(ndf, 'node') g2 = g.umap(X=.., y=..).dbscan() predict: :: emb, X, y, ndf = g2.transform_dbscan(ndf, ndf, return_graph=False) # or g3 = g2.transform_dbscan(ndf, ndf, return_graph=True) g3.plot()
- Args:
- df:
dataframe to transform
- y:
optional labels dataframe
- min_dist:
The maximum distance between two samples for them to be considered as in the same neighborhood. smaller values will result in less edges between the minibatch and the original graph. Default ‘auto’, infers min_dist from the mean distance and std of new points to the original graph
- fit_umap_embedding:
whether to use UMAP embeddings or features dataframe when inferring edges between the minibatch and the original graph. Default False, uses the features dataframe
- sample:
number of samples to use when inferring edges between the minibatch and the original graph, if None, will only use closest point to the minibatch. If greater than 0, will sample the closest sample points in existing graph to pull in more edges. Default None
- kind:
‘nodes’ or ‘edges’
- return_graph:
whether to return a graph or the (emb, X, y, minibatch df enriched with DBSCAN labels), default True infered graph supports kind=’nodes’ only.
- verbose:
whether to print out progress, default False
- Parameters:
df (DataFrame)
y (DataFrame | None)
min_dist (float | str)
infer_umap_embedding (bool)
sample (int | None)
n_neighbors (int | None)
kind (str)
return_graph (bool)
verbose (bool)
- graphistry.compute.cluster.dbscan_fit_inplace(res, dbscan, kind='nodes', cols=None, use_umap_embedding=True, target=False, verbose=False)#
- Fits clustering on UMAP embeddings if umap is True, otherwise on the features dataframe
or target dataframe if target is True.
- Sets:
res._dbscan_edges or res._dbscan_nodes to the DBSCAN model
res._edges or res._nodes gains column _dbscan
- Args:
- res:
graphistry graph
- kind:
‘nodes’ or ‘edges’
- cols:
list of columns to use for clustering given g.featurize has been run
- use_umap_embedding:
whether to use UMAP embeddings or features dataframe for clustering (default: True)
- target:
whether to use the target dataframe or features dataframe (typically False, for features)
- Parameters:
res (Plottable)
dbscan (Any)
kind (Literal['nodes', 'edges'])
cols (List | str | None)
use_umap_embedding (bool)
target (bool)
verbose (bool)
- Return type:
None
- graphistry.compute.cluster.dbscan_predict_cuml(X, model)#
- Parameters:
X (Any)
model (Any)
- Return type:
Any
- graphistry.compute.cluster.dbscan_predict_sklearn(X, model)#
DBSCAN has no predict per se, so we reverse engineer one here from https://stackoverflow.com/questions/27822752/scikit-learn-predicting-new-points-with-dbscan
- Parameters:
X (DataFrame)
model (Any)
- Return type:
ndarray
- graphistry.compute.cluster.get_model_matrix(g, kind, cols, umap, target)#
Allows for a single function to get the model matrix for both nodes and edges as well as targets, embeddings, and features
- Args:
- g:
graphistry graph
- kind:
‘nodes’ or ‘edges’
- cols:
list of columns to use for clustering given g.featurize has been run
- umap:
whether to use UMAP embeddings or features dataframe
- target:
whether to use the target dataframe or features dataframe
- Returns:
pd.DataFrame: dataframe of model matrix given the inputs
- Parameters:
g (Plottable)
kind (Literal['nodes', 'edges'])
cols (List | str | None)
- Return type:
Any
- graphistry.compute.cluster.make_safe_gpu_dataframes(X, y, engine)#
Coerce a dataframe to pd vs cudf based on engine
- Parameters:
X (Any | None)
y (Any | None)
engine (Engine)
- Return type:
Tuple[Any | None, Any | None]
- graphistry.compute.cluster.resolve_dbscan_engine(engine, g_or_df=None)#
Resolves the engine to use for DBSCAN clustering
If ‘auto’, decide by checking if cuml or sklearn is installed, and if provided, natural type of the dataset. GPU is used if both a GPU dataset and GPU library is installed. Otherwise, CPU library.
- Parameters:
engine (Literal['cuml', 'sklearn', 'auto'])
g_or_df (Any | None)
- Return type:
Literal[‘cuml’, ‘sklearn’]
graphistry.compute.collapse module#
- graphistry.compute.collapse.check_default_columns_present_and_coerce_to_string(g)#
Helper to set COLLAPSE columns to nodes and edges dataframe, while converting src, dst, node to dtype(str) :param g: graphistry instance
- Returns:
graphistry instance
- Parameters:
g (Plottable)
- graphistry.compute.collapse.check_has_set(ndf, parent, child)#
- graphistry.compute.collapse.collapse_algo(g, child, parent, attribute, column, seen)#
Basically candy crush over graph properties in a topology aware manner
Checks to see if child node has desired property from parent, we will need to check if (start_node=parent: has_attribute , children nodes: has_attribute) by case (T, T), (F, T), (T, F) and (F, F),we start recursive collapse (or not) on the children, reassigning nodes and edges.
if (T, T), append children nodes to start_node, re-assign the name of the node, and update the edge table with new name,
if (F, T) start k-(potentially new) super nodes, with k the number of children of start_node. Start node keeps k outgoing edges.
if (T, F) it is the end of the cluster, and we keep new node as is; keep going
if (F, F); keep going
- Parameters:
seen (dict)
g (Plottable) – graphistry instance
child (str | int) – child node to start traversal, for first traversal, set child=parent or vice versa.
parent (str | int) – parent node to start traversal, in main call, this is set to child.
attribute (str | int) – attribute to collapse by
column (str | int) – column in nodes dataframe to collapse over.
- Returns:
graphistry instance with collapsed nodes.
- graphistry.compute.collapse.collapse_by(self, parent, start_node, attribute, column, seen, self_edges=False, unwrap=False, verbose=True)#
Main call in collapse.py, collapses nodes and edges by attribute, and returns normalized graphistry object.
- Parameters:
self (Plottable) – graphistry instance
parent (str | int) – parent node to start traversal, in main call, this is set to child.
start_node (str | int)
attribute (str | int) – attribute to collapse by
column (str | int) – column in nodes dataframe to collapse over.
seen (dict) – dict of previously collapsed pairs – {n1, n2) is seen as different from (n2, n1)
verbose (bool) – bool, default True
self_edges (bool)
unwrap (bool)
- Return type:
:returns graphistry instance with collapsed and normalized nodes.
- graphistry.compute.collapse.collapse_nodes_and_edges(g, parent, child)#
Asserts that parent and child node in ndf should be collapsed into super node. Sets new ndf with COLLAPSE nodes in graphistry instance g
# this asserts that we SHOULD merge parent and child as super node # outside logic controls when that is the case # for example, it assumes parent is already in cluster keys of COLLAPSE node
- Parameters:
g (Plottable) – graphistry instance
parent (str | int) – node with attribute in column
child (str | int) – node with attribute in column
- Returns:
graphistry instance
- graphistry.compute.collapse.get_children(g, node_id, hops=1)#
Helper that gets children at k-hops from node node_id
:returns graphistry instance of hops
- Parameters:
g (Plottable)
node_id (str | int)
hops (int)
- graphistry.compute.collapse.get_cluster_store_keys(ndf, node)#
Main innovation in finding and adding to super node. Checks if node is a segment in any collapse_node in COLLAPSE column of nodes DataFrame
- Parameters:
ndf (DataFrame) – node DataFrame
node (str | int) – node to find
- Returns:
DataFrame of bools of where wrap_key(node) exists in COLLAPSE column
- graphistry.compute.collapse.get_edges_in_out_cluster(g, node_id, attribute, column, directed=True)#
Traverses children of node_id and separates them into incluster and outcluster sets depending if they have attribute in node DataFrame column
- Parameters:
g (Plottable) – graphistry instance
node_id (str | int) – node with attribute in column
attribute (str | int) – attribute to collapse in column over
column (str | int) – column to collapse over
directed (bool)
- graphistry.compute.collapse.get_edges_of_node(g, node_id, outgoing_edges=True, hops=1)#
Gets edges of node at k-hops from node
- Parameters:
g (Plottable) – graphistry instance
node_id (str | int) – node to find edges from
outgoing_edges (bool) – bool, if true, finds all outgoing edges of node, default True
hops (int) – the number of hops from node to take, default = 1
- Returns:
DataFrame of edges
- graphistry.compute.collapse.get_new_node_name(ndf, parent, child)#
If child in cluster group, melts name, else makes new parent_name from parent, child
- Parameters:
ndf (DataFrame) – node DataFrame
parent (str | int) – node with attribute in column
child (str | int) – node with attribute in column
- Return type:
str
:returns new_parent_name
- graphistry.compute.collapse.has_edge(g, n1, n2, directed=True)#
Checks if n1 and n2 share an (directed or not) edge
- Parameters:
g (Plottable) – graphistry instance
n1 (str | int) – node to check if has edge to n2
n2 (str | int) – node to check if has edge to n1
directed (bool) – bool, if True, checks only outgoing edges from n1->`n2`, else finds undirected edges
- Returns:
bool, if edge exists between n1 and n2
- Return type:
bool
- graphistry.compute.collapse.has_property(g, ref_node, attribute, column)#
Checks if ref_node is in node dataframe in column with attribute :param attribute: :param column: :param g: graphistry instance :param ref_node: node to check if it as attribute in column
- Returns:
bool
- Parameters:
g (Plottable)
ref_node (str | int)
attribute (str | int)
column (str | int)
- Return type:
bool
- graphistry.compute.collapse.in_cluster_store_keys(ndf, node)#
checks if node is in collapse_node in COLLAPSE column of nodes DataFrame
- Parameters:
ndf (DataFrame) – nodes DataFrame
node (str | int) – node to find
- Returns:
bool
- Return type:
bool
- graphistry.compute.collapse.melt(ndf, node)#
Reduces node if in cluster store, otherwise passes it through. ex:
node = “4” will take any sequence from get_cluster_store_keys, “1 2 3”, “4 3 6” and returns “1 2 3 4 6” when they have a common entry (3).
:param ndf, node DataFrame :param node: node to melt :returns new_parent_name of super node
- Parameters:
ndf (DataFrame)
node (str | int)
- Return type:
str
- graphistry.compute.collapse.normalize_graph(g, self_edges=False, unwrap=False)#
Final step after collapse traversals are done, removes duplicates and moves COLLAPSE columns into respective(node, src, dst) columns of node, edges dataframe from Graphistry instance g.
- graphistry.compute.collapse.reduce_key(key)#
Takes “1 1 2 1 2 3” -> “1 2 3
- Parameters:
key (str | int) – node name
- Returns:
new node name with duplicates removed
- Return type:
str
- graphistry.compute.collapse.unpack(g)#
Helper method that unpacks graphistry instance
ex:
ndf, edf, src, dst, node = unpack(g)
- Parameters:
g (Plottable) – graphistry instance
- Returns:
node DataFrame, edge DataFrame, source column, destination column, node column
- graphistry.compute.collapse.unwrap_key(name)#
Unwraps node name: ~name~ -> name
- Parameters:
name (str | int) – node to unwrap
- Returns:
unwrapped node name
- Return type:
str
- graphistry.compute.collapse.wrap_key(name)#
Wraps node name -> ~name~
- Parameters:
name (str | int) – node name
- Returns:
wrapped node name
- Return type:
str
graphistry.compute.conditional module#
- class graphistry.compute.conditional.ConditionalMixin(*a, **kw)#
Bases:
Plottable- DGL_graph: Any | None#
- conditional_graph(x, given, kind='nodes', *args, **kwargs)#
conditional_graph – p(x|given) = p(x, given) / p(given)
Useful for finding the conditional probability of a node or edge attribute
returned dataframe sums to 1 on each column
- Parameters:
x – target column
given – the dependent column
kind – ‘nodes’ or ‘edges’
args/kwargs – additional arguments for g.bind(…)
- Returns:
a graphistry instance with the conditional graph edges weighted by the conditional probability. edges are between x and given, keep in mind that g._edges.columns = [given, x, _probs]
- conditional_probs(x, given, kind='nodes', how='index')#
Produces a Dense Matrix of the conditional probability of x given y
- Args:
x: the column variable of interest given the column y=given given : the variabe to fix constant df pd.DataFrame: dataframe how (str, optional): One of ‘column’ or ‘index’. Defaults to ‘index’. kind (str, optional): ‘nodes’ or ‘edges’. Defaults to ‘nodes’.
- Returns:
pd.DataFrame: the conditional probability of x given the column y as dense array like dataframe
- session: ClientSession#
- graphistry.compute.conditional.conditional_probability(x, given, df)#
- conditional probability function over categorical variables
p(x | given) = p(x, given)/p(given)
- Args:
x: the column variable of interest given the column ‘given’ given: the variabe to fix constant df: dataframe with columns [given, x]
- Returns:
pd.DataFrame: the conditional probability of x given the column ‘given’
- Parameters:
df (DataFrame)
- graphistry.compute.conditional.probs(x, given, df, how='index')#
Produces a Dense Matrix of the conditional probability of x given y=given
- Args:
x: the column variable of interest given the column ‘y’ given : the variabe to fix constant df pd.DataFrame: dataframe how (str, optional): One of ‘column’ or ‘index’. Defaults to ‘index’.
- Returns:
pd.DataFrame: the conditional probability of x given the column ‘y’ as dense array like dataframe
- Parameters:
df (DataFrame)
graphistry.compute.filter_by_dict module#
- graphistry.compute.filter_by_dict.filter_by_dict(df, filter_dict=None, engine=EngineAbstract.AUTO)#
return df where rows match all values in filter_dict
- Parameters:
df (Any)
filter_dict (dict | None)
engine (EngineAbstract | str)
- Return type:
Any
- graphistry.compute.filter_by_dict.filter_edges_by_dict(self, filter_dict=None, engine=EngineAbstract.AUTO)#
filter edges to those that match all values in filter_dict
graphistry.compute.hop module#
- graphistry.compute.hop.generate_safe_column_name(base_name, df, prefix='__temp_', suffix='__')#
Generate a temporary column name that doesn’t conflict with existing columns. Uses a simple incrementing counter to avoid dependencies.
Parameters:#
- base_namestr
The original column name to base the temporary name on
- dfDataFrame
The DataFrame to check for column name conflicts
- prefixstr
Prefix to prepend to the temporary column name
- suffixstr
Suffix to append to the temporary column name
Returns:#
- str
A unique column name that doesn’t exist in the DataFrame
- graphistry.compute.hop.hop(self, nodes=None, hops=1, to_fixed_point=False, direction='forward', edge_match=None, source_node_match=None, destination_node_match=None, source_node_query=None, destination_node_query=None, edge_query=None, return_as_wave_front=False, target_wave_front=None, engine=EngineAbstract.AUTO)#
Given a graph and some source nodes, return subgraph of all paths within k-hops from the sources
This can be faster than the equivalent chain([…]) call that wraps it with additional steps
See chain() examples for examples of many of the parameters
g: Plotter nodes: dataframe with id column matching g._node. None signifies all nodes (default). hops: consider paths of length 1 to ‘hops’ steps, if any (default 1). to_fixed_point: keep hopping until no new nodes are found (ignores hops) direction: ‘forward’, ‘reverse’, ‘undirected’ edge_match: dict of kv-pairs to exact match (see also: filter_edges_by_dict) source_node_match: dict of kv-pairs to match nodes before hopping (including intermediate) destination_node_match: dict of kv-pairs to match nodes after hopping (including intermediate) source_node_query: dataframe query to match nodes before hopping (including intermediate) destination_node_query: dataframe query to match nodes after hopping (including intermediate) edge_query: dataframe query to match edges before hopping (including intermediate) return_as_wave_front: Exclude starting node(s) in return, returning only encountered nodes target_wave_front: Only consider these nodes + self._nodes for reachability engine: ‘auto’, ‘pandas’, ‘cudf’ (GPU)
- Parameters:
self (Plottable)
nodes (Any | None)
hops (int | None)
to_fixed_point (bool)
direction (str)
edge_match (dict | None)
source_node_match (dict | None)
destination_node_match (dict | None)
source_node_query (str | None)
destination_node_query (str | None)
edge_query (str | None)
target_wave_front (Any | None)
engine (EngineAbstract | str)
- Return type:
- graphistry.compute.hop.prepare_merge_dataframe(edges_indexed, column_conflict, source_col, dest_col, edge_id_col, node_col, temp_col, is_reverse=False)#
Prepare a merge DataFrame handling column name conflicts for hop operations. Centralizes the conflict resolution logic for both forward and reverse directions.
Parameters:#
- edges_indexedDataFrame
The indexed edges DataFrame
- column_conflictbool
Whether there’s a column name conflict
- source_colstr
The source column name
- dest_colstr
The destination column name
- edge_id_colstr
The edge ID column name
- node_colstr
The node column name
- temp_colstr
The temporary column name to use in case of conflict
- is_reversebool, default=False
Whether to prepare for reverse direction hop
Returns:#
- DataFrame
A merge DataFrame prepared for hop operation
- Parameters:
edges_indexed (Any)
column_conflict (bool)
source_col (str)
dest_col (str)
edge_id_col (str)
node_col (str)
temp_col (str)
is_reverse (bool)
- Return type:
Any
- graphistry.compute.hop.process_hop_direction(direction_name, wave_front_iter, edges_indexed, column_conflict, source_col, dest_col, edge_id_col, node_col, temp_col, intermediate_target_wave_front, base_target_nodes, target_col, node_match_query, node_match_dict, is_reverse, debugging)#
Process a single hop direction (forward or reverse)
Parameters:#
- direction_namestr
Name of the direction for debug logging (‘forward’ or ‘reverse’)
- wave_front_iterDataFrame
Current wave front of nodes to expand from
- edges_indexedDataFrame
The indexed edges DataFrame
- column_conflictbool
Whether there’s a name conflict between node and edge columns
- source_colstr
The source column name
- dest_colstr
The destination column name
- edge_id_colstr
The edge ID column name
- node_colstr
The node column name
- temp_colstr
The temporary column name for conflict resolution
- intermediate_target_wave_frontDataFrame or None
Pre-calculated target wave front for filtering
- base_target_nodesDataFrame
The base target nodes for destination filtering
- target_colstr
The target column for merging (destination or source depending on direction)
- node_match_querystr or None
Optional query for node filtering
- node_match_dictdict or None
Optional dictionary for node filtering
- is_reversebool
Whether this is the reverse direction
- debuggingbool
Whether debug logging is enabled
Returns:#
- Tuple[DataFrame, DataFrame]
The processed hop edges and node IDs
- Parameters:
direction_name (str)
wave_front_iter (Any)
edges_indexed (Any)
column_conflict (bool)
source_col (str)
dest_col (str)
edge_id_col (str)
node_col (str)
temp_col (str)
intermediate_target_wave_front (Any | None)
base_target_nodes (Any)
target_col (str)
node_match_query (str | None)
node_match_dict (dict | None)
is_reverse (bool)
debugging (bool)
- Return type:
Tuple[Any, Any]
- graphistry.compute.hop.query_if_not_none(query, df)#
- Parameters:
query (str | None)
df (Any)
- Return type:
Any
graphistry.compute.python_remote module#
- graphistry.compute.python_remote.python_remote_g(self, code, api_token=None, dataset_id=None, format='parquet', output_type='all', engine='cudf', run_label=None, validate=True)#
Remotely run Python code on a remote dataset that returns a Plottable
Uses the latest bound _dataset_id, and uploads current dataset if not already bound. Note that rebinding calls of edges() and nodes() reset the _dataset_id binding.
- Parameters:
code (Union[str, Callable[..., object]]) – Python code that includes a top-level function def task(g: Plottable) -> Union[str, Dict].
api_token (Optional[str]) – Optional JWT token. If not provided, refreshes JWT and uses that.
dataset_id (Optional[str]) – Optional dataset_id. If not provided, will fallback to self._dataset_id. If not defined, will upload current data, store that dataset_id, and run code against that.
format (Optional[FormatType]) – What format to fetch results. Defaults to ‘parquet’.
output_type (Optional[OutputTypeGraph]) – What shape of output to fetch. Defaults to ‘all’. Options include ‘nodes’, ‘edges’, ‘all’ (both). For other variants, see python_remote_shape and python_remote_json.
engine (Literal["pandas", "cudf]) – Override which run mode GFQL uses. Defaults to “cudf”.
run_label (Optional[str]) – Optional label for the run for serverside job tracking.
validate (bool) – Whether to locally test code, and if uploading data, the data. Default true.
self (Plottable)
- Return type:
- Example: Upload data and count the results
import graphistry from graphistry import n, e es = pandas.DataFrame({'src': [0,1,2], 'dst': [1,2,0]}) g1 = graphistry .edges(es, source='src', destination='dst') .upload() assert g1._dataset_id is not None, "Successfully uploaded" g2 = g1.python_remote_g( code=''' from typing import Any, Dict from graphistry import Plottable def task(g: Plottable) -> Dict[str, Any]: return g ''', engine='cudf') num_edges = len(g2._edges) print(f'num_edges: {num_edges}')
- graphistry.compute.python_remote.python_remote_generic(self, code, api_token=None, dataset_id=None, format='json', output_type='json', engine='cudf', run_label=None, validate=True)#
Remotely run Python code on a remote dataset.
Uses the latest bound _dataset_id, and uploads current dataset if not already bound. Note that rebinding calls of edges() and nodes() reset the _dataset_id binding.
- Parameters:
code (Union[str, Callable[..., object]]) – Python code that includes a top-level function def task(g: Plottable) -> Union[str, Dict].
api_token (Optional[str]) – Optional JWT token. If not provided, refreshes JWT and uses that.
dataset_id (Optional[str]) – Optional dataset_id. If not provided, will fallback to self._dataset_id. If not defined, will upload current data, store that dataset_id, and run code against that.
format (Optional[FormatType]) – What format to fetch results. Defaults to ‘json’. We recommend a columnar format such as parquet.
output_type (Optional[OutputTypeAll]) – What shape of output to fetch. Defaults to ‘json’. Options include ‘nodes’, ‘edges’, ‘all’ (both), ‘table’, ‘shape’, and ‘json’.
engine (Literal["pandas", "cudf]) – Override which run mode GFQL uses. Defaults to “cudf”.
run_label (Optional[str]) – Optional label for the run for serverside job tracking.
validate (bool) – Whether to locally test code, and if uploading data, the data. Default true.
self (Plottable)
- Return type:
Plottable | DataFrame | Any
- Example: Upload data and count the results
import graphistry from graphistry import n, e es = pandas.DataFrame({'src': [0,1,2], 'dst': [1,2,0]}) g1 = graphistry .edges(es, source='src', destination='dst') .upload() assert g1._dataset_id is not None, "Successfully uploaded" out_json = g1.python_remote( code=''' from typing import Any, Dict from graphistry import Plottable def task(g: Plottable) -> Dict[str, Any]: return { 'num_edges': len(g._edges) } ''', engine='cudf') num_edges = out_json['num_edges'] print(f'num_edges: {num_edges}')
- graphistry.compute.python_remote.python_remote_json(self, code, api_token=None, dataset_id=None, engine='cudf', run_label=None, validate=True)#
Remotely run Python code on a remote dataset that returns json
Uses the latest bound _dataset_id, and uploads current dataset if not already bound. Note that rebinding calls of edges() and nodes() reset the _dataset_id binding.
- Parameters:
code (Union[str, Callable[..., object]]) – Python code that includes a top-level function def task(g: Plottable) -> Union[str, Dict].
api_token (Optional[str]) – Optional JWT token. If not provided, refreshes JWT and uses that.
dataset_id (Optional[str]) – Optional dataset_id. If not provided, will fallback to self._dataset_id. If not defined, will upload current data, store that dataset_id, and run code against that.
engine (Literal["pandas", "cudf]) – Override which run mode GFQL uses. Defaults to “cudf”.
run_label (Optional[str]) – Optional label for the run for serverside job tracking.
validate (bool) – Whether to locally test code, and if uploading data, the data. Default true.
self (Plottable)
- Return type:
Any
- Example: Upload data and count the results
import graphistry from graphistry import n, e es = pandas.DataFrame({'src': [0,1,2], 'dst': [1,2,0]}) g1 = graphistry .edges(es, source='src', destination='dst') .upload() assert g1._dataset_id is not None, "Successfully uploaded" obj = g1.python_remote_json( code=''' from typing import Any, Dict from graphistry import Plottable def task(g: Plottable) -> Dict[str, Any]: return {'num_edges': len(g._edges)} ''', engine='cudf') num_edges = obj['num_edges'] print(f'num_edges: {num_edges}')
- graphistry.compute.python_remote.python_remote_table(self, code, api_token=None, dataset_id=None, format='parquet', output_type='table', engine='cudf', run_label=None, validate=True)#
Remotely run Python code on a remote dataset that returns a table
Uses the latest bound _dataset_id, and uploads current dataset if not already bound. Note that rebinding calls of edges() and nodes() reset the _dataset_id binding.
- Parameters:
code (Union[str, Callable[..., object]]) – Python code that includes a top-level function def task(g: Plottable) -> Union[str, Dict].
api_token (Optional[str]) – Optional JWT token. If not provided, refreshes JWT and uses that.
dataset_id (Optional[str]) – Optional dataset_id. If not provided, will fallback to self._dataset_id. If not defined, will upload current data, store that dataset_id, and run code against that.
format (Optional[FormatType]) – What format to fetch results. Defaults to ‘parquet’.
output_type (Optional[OutputTypeGraph]) – What shape of output to fetch. Defaults to ‘table’. Options include ‘table’, ‘nodes’, and ‘edges’.
engine (Literal["pandas", "cudf]) – Override which run mode GFQL uses. Defaults to “cudf”.
run_label (Optional[str]) – Optional label for the run for serverside job tracking.
validate (bool) – Whether to locally test code, and if uploading data, the data. Default true.
self (Plottable)
- Return type:
DataFrame
- Example: Upload data and count the results
import graphistry from graphistry import n, e es = pandas.DataFrame({'src': [0,1,2], 'dst': [1,2,0]}) g1 = graphistry .edges(es, source='src', destination='dst') .upload() assert g1._dataset_id is not None, "Successfully uploaded" edges_df = g1.python_remote_table( code=''' from typing import Any, Dict from graphistry import Plottable def task(g: Plottable) -> Dict[str, Any]: return g._edges ''', engine='cudf') num_edges = len(edges_df) print(f'num_edges: {num_edges}')
- graphistry.compute.python_remote.validate_python_str(code)#
Validate Python code string.
Returns True if the code string is valid, otherwise return False or raise ValueError
- Parameters:
code (str)
- Return type:
bool