pyspark-bbn

Data

There are two types of data sets.

  • A dataset with all discrete/categorical variables.

  • A dataset with all continuous variables.

Discrete Data

class pyspark_bbn.discrete.data.CartesianMiComputer

Bases: object

Compute information measures via Cartesian products.

static get_cmi(sdf: DataFrame, triplets: RDD[Tuple[List[str], List[str], List[str]]]) DataFrame

Compute conditional mutual information for many triplets.

Parameters:
  • sdf – Spark data frame.

  • triplets – RDD of column triplets.

Returns:

DataFrame of conditional mutual information values.

static get_mi(sdf: DataFrame, pairs: RDD[Tuple[List[str], List[str]]]) DataFrame

Compute mutual information for many pairs.

Parameters:
  • sdf – Spark data frame.

  • pairs – RDD of column pairs.

Returns:

DataFrame of mutual information values.

class pyspark_bbn.discrete.data.DataFrameMiComputer

Bases: object

Compute information measures returning DataFrames.

static get_cmi(sdf: DataFrame, triplet_rdd) DataFrame

Compute conditional mutual information for many triplets.

Parameters:
  • sdf – Spark data frame.

  • triplet_rdd – RDD of column triplets with values.

Returns:

DataFrame of conditional mutual information values.

static get_mi(sdf: DataFrame, pairs_rdd) DataFrame

Compute mutual information for many pairs.

Parameters:
  • sdf – Spark data frame.

  • pairs_rdd – RDD of column pairs with values.

Returns:

DataFrame of mutual information values.

class pyspark_bbn.discrete.data.DiscreteData(sdf: DataFrame)

Bases: object

Discrete data.

__init__(sdf: DataFrame)

Initialize the discrete dataset.

Parameters:

sdf – Spark dataframe.

static compute_bd_score(X: List[str], Y: List[str], XY_vals: List[Tuple[str, ...]], counts: Dict[str, int]) Tuple[List[str], List[str], float]

Computes the Bayesian dirichlet score.

Parameters:
  • X – Child.

  • Y – Parent.

  • XY_vals – XY values.

  • counts – Counts.

Returns:

Tuple of X, Y and score.

static compute_cmi(X: List[str], Y: List[str], Z: List[str], XYZ_vals: List[Tuple[str, ...]], counts: Dict[str, int], n: int) Tuple[List[str], List[str], List[str], float]

Computes the conditional mutual information of X and Y given Z.

Parameters:
  • X – X variables.

  • Y – Y variables.

  • Z – Z variables.

  • XYZ_vals – XYZ values.

  • counts – Counts.

  • n – Number of data.

Returns:

Tuple of X, Y, Z and conditional mutual information.

static compute_mi(X: List[str], Y: List[str], XY_vals: List[Tuple[str, ...]], counts: Dict[str, int], n: int) Tuple[List[str], List[str], float]

Computes the mutual information.

Parameters:
  • X – X variables.

  • Y – Y variables.

  • XY_vals – XY values.

  • counts – Counts.

  • n – Total data points.

Returns:

Tuple of X, Y and mutual information.

drop(columns: List[str])

Drops the specified columns.

Parameters:

columns – List of columns.

Returns:

New dataset without the dropped columns.

static get_bd(scoring_pairs: List[ScoringPair], counts: Dict[str, int]) List[Tuple[List[str], List[str], float]]

Computes the Bayesian dirichlet score.

Parameters:
  • scoring_pairs – Scoring pairs.

  • counts – Counts.

Returns:

List of Bayesian dirichlet scores.

get_bd_par(pairs: List[ScoringPair]) List[Tuple[List[str], List[str], float]]

Computes the Bayesian dirichlet for each pair of variable (parallel).

Parameters:

pairs – Scoring pairs.

Returns:

List of Bayesian dirichlet scores.

static get_cmi(triplets: List[Triplet], counts: Dict[str, int], n: int) List[Tuple[List[str], List[str], List[str], float]]

Computes the conditional mutual information for each triplet of variables.

Parameters:
  • triplets – List of triplet variables.

  • counts – Count dictionary.

  • n – Number of data.

Returns:

List of conditional mutual information.

get_cmi_par(triplets: List[Triplet]) List[Tuple[List[str], List[str], List[str], float]]

Computes the conditional mutual information for each triplet of variables (parallel).

Parameters:

triplets – List of triplets.

Returns:

List of conditional mutual information.

get_counts_for_bd(scoring_pairs: List[ScoringPair]) Dict[str, int]

Gets the counts required to compute Bayesian dirichlet scoring.

Parameters:

scoring_pairs – List of scoring pairs (child, parents).

Returns:

get_counts_for_cmi(triplets: List[Triplet]) Dict[str, int]

Gets the counts required to compute conditional mutual information.

Parameters:

triplets – List of triplets.

Returns:

Dictionary of counts.

get_counts_for_mi(pairs: List[Pair]) Dict[str, int]

Gets the counts required to compute pairwise mutual information.

Parameters:

pairs – List of pairs.

Returns:

Dictionary of counts.

get_domains() Dict[str, List[str]]

Gets the domain of each field.

Returns:

Domain of each field.

static get_mi(pairs: List[Pair], counts: Dict[str, int], n: int) List[Tuple[List[str], List[str], float]]

Computes the mutual information for each pair of variable.

Parameters:
  • pairs – List of pairs of variables.

  • counts – Count dictionary.

  • n – Number of data.

Returns:

List of mutual information.

get_mi_par(pairs: List[Pair]) List[Tuple[List[str], List[str], float]]

Computes the mutual information for each pair of variable (parallel).

Parameters:

pairs – List of pairs of variables.

Returns:

List of mutual information.

get_profile() Dict[str, Dict[str, int]]

Gets the data profile.

Returns:

Dictionary. Keys are variable names. Values are dictionary of value-frequency.

get_spark_context() SparkContext

Get SparkContext.

Returns:

SparkContext.

get_spark_session() SparkSession

Get SparkSession.

Returns:

SparkSession.

class pyspark_bbn.discrete.data.Pair(X: List[str], Y: List[str], profile: Dict[str, Dict[str, int]])

Bases: object

Pair. Useful for computing pairwise statistics.

__init__(X: List[str], Y: List[str], profile: Dict[str, Dict[str, int]])

Initialize the pair.

Parameters:
  • X – List of X variables.

  • Y – List of Y variables.

  • profile – Profile of variables.

get_entries(r: Row) List[Tuple[str, int]]

Gets the entries used for counting.

Parameters:

r – Row.

Returns:

Keys used for counting.

get_entries_par(r: Row) List[Tuple[Tuple[str, str], int]]

Get keys for counting using an identifier.

Parameters:

r – Row.

Returns:

Keys paired with counts.

class pyspark_bbn.discrete.data.PairwiseMiComputer

Bases: object

Utilities for computing pairwise mutual information.

static get_cmi(sdf: DataFrame, x_cols: List[str], y_cols: List[str], z_cols: List[str]) float

Compute conditional mutual information.

Parameters:
  • sdf – Spark data frame.

  • x_cols – Columns for X.

  • y_cols – Columns for Y.

  • z_cols – Conditioning columns Z.

Returns:

Conditional mutual information.

static get_mi(sdf: DataFrame, x_cols: List[str], y_cols: List[str]) float

Compute mutual information between two variable sets.

Parameters:
  • sdf – Spark data frame.

  • x_cols – Columns for X.

  • y_cols – Columns for Y.

Returns:

Mutual information.

class pyspark_bbn.discrete.data.ScoringPair(X: List[str], Y: List[str], profile: Dict[str, Dict[str, int]])

Bases: Pair

Scoring pair X, Y. Useful for Bayesian dirichlet scoring.

__init__(X: List[str], Y: List[str], profile: Dict[str, Dict[str, int]])

Initialize the scoring pair.

Parameters:
  • X – Child.

  • Y – Parents.

  • profile – Variable profile.

get_entries(r: Row) List[Tuple[str, int]]

Get entries used for counting.

Parameters:

r – Row.

Returns:

Keys and counts.

get_entries_par(r: Row) List[Tuple[Tuple[str, str], int]]

Get entries with identifiers for counting.

Parameters:

r – Row.

Returns:

Keys paired with counts.

class pyspark_bbn.discrete.data.Triplet(X: List[str], Y: List[str], Z: List[str], profile: Dict[str, Dict[str, int]])

Bases: object

Triplet X, Y, Z. Useful for computing conditional statistics.

__init__(X: List[str], Y: List[str], Z: List[str], profile: Dict[str, Dict[str, int]])

Initialize the triplet.

Parameters:
  • X – List of X variables.

  • Y – List of Y variables.

  • Z – List of Z variables.

  • profile – Profile of variables.

get_entries(r: Row) List[Tuple[str, int]]

Gets the entries used for counting.

Parameters:

r – Row.

Returns:

Keys used for counting.

get_entries_par(r: Row) List[Tuple[Tuple[str, str], int]]

Get entries with identifiers for counting.

Parameters:

r – Row.

Returns:

Keys paired with counts.

pyspark_bbn.discrete.data.get_all_pair_filters(xy: List[Tuple[List[str], List[str]]], domains: Dict[str, List[str]]) Generator[Tuple[List[str], List[str], List[str], List[str]], None, None]

For each (x_cols, y_cols) pair in xy, yield all combinations of (x_cols, y_cols, x_vals, y_vals) using get_filters.

pyspark_bbn.discrete.data.get_pair_filters(x_cols: List[str], y_cols: List[str], domains: Dict[str, List[str]]) Generator[Tuple[List[str], List[str], List[str], List[str]], None, None]

Generate all possible combinations of (x_vals, y_vals) for given x_cols and y_cols based on provided value domains.

pyspark_bbn.discrete.data.get_pairs(data: DiscreteData) List[Pair]

Gets a list of pairs of variables.

Parameters:

data – Data.

Returns:

List of pairs.

Continuous Data

class pyspark_bbn.continuous.data.CondMvn(index_1: List[int], index_2: List[int], means: ndarray, cov: ndarray, zero=1e-06, cols_1: List[str] = None, cols_2: List[str] = None)

Bases: object

Conditional multivariate normal.

__init__(index_1: List[int], index_2: List[int], means: ndarray, cov: ndarray, zero=1e-06, cols_1: List[str] = None, cols_2: List[str] = None)

Initialize the conditional multivariate normal.

Parameters:
  • index_1 – Index of dependent variables.

  • index_2 – Index of conditioning variables.

  • means – Means.

  • cov – Covariance matrix.

  • zero – Threshold below which to consider a probability as zero.

  • cols_1 – Names corresponding to index_1.

  • cols_2 – Names corresponding to index_2.

equals(other: Any) bool

Checks if this is equal to other.

Parameters:

other – CondMvn.

Returns:

Boolean.

log_proba(x: ndarray) ndarray

Estimate the log conditional probability of the specified data point.

Parameters:

x – Data point.

Returns:

Log probability.

static partition_cov(cov: ndarray, index_1: List[int], index_2: List[int]) Tuple[ndarray, ndarray, ndarray, ndarray]

Partitions the covariance matrix.

Parameters:
  • cov – Covariance matrix.

  • index_1 – Index.

  • index_2 – Index.

Returns:

Partitioned covariance matrix.

static partition_means(means: ndarray, index_1: List[int], index_2: List[int]) Tuple[ndarray, ndarray]

Partitions the means.

Parameters:
  • means – Means.

  • index_1 – Index.

  • index_2 – Index.

Returns:

Partitoned mean.

static partition_x(x: ndarray, index_1: List[int], index_2: List[int]) Tuple[ndarray, ndarray]

Partitions the data point.

Parameters:
  • x – Data point.

  • index_1 – Index.

  • index_2 – Index.

Returns:

Tuple of data point partitioned.

pdf(x: ndarray) float

Estimate the conditional probability of the specified data point.

Parameters:

x – Data point.

Returns:

Probability.

class pyspark_bbn.continuous.data.GaussianData(sdf: DataFrame, n_samples=6, spark=None)

Bases: object

Gaussian data.

__init__(sdf: DataFrame, n_samples=6, spark=None)

Initialize the Gaussian dataset.

Parameters:
  • sdf – Spark data frame.

  • n_samples – Number of samples.

  • spark – Spark object.

drop(columns: List[str])

Drops specified columns.

Parameters:

columns – List of columns.

Returns:

Guassian data.

get_cmi_par(triplets: List[Triplet]) List[Tuple[List[str], List[str], List[str], float]]

Computes conditional mutual information between triplets in parallel.

Parameters:

triplets – List of triplets (of variables).

Returns:

List of conditional mutual information.

get_covariance() ndarray

Gets the covariance matrix.

Returns:

Covariance matrix.

get_means() ndarray

Get means.

Returns:

List of means.

get_mi_par(pairs: List[Pair]) List[Tuple[List[str], List[str], float]]

Computes mutual information between the pairs of variables in parallel.

Parameters:

pairs – List of pairs (of variables).

Returns:

List of mutual information.

get_min_max(columns: List[str]) Dict[str, Dict[str, int | float]]

Get dictionary of min/max.

Parameters:

columns – Variable names.

Returns:

Dictionary of min/max associated with names.

get_min_max_for(column: str) Dict[str, int | float]

Get min/max value for specified variable.

Parameters:

column – Variable name.

Returns:

Dictionary of min/max.

get_mvn(columns: List[str]) Mvn

Gets a multivariate normal instance.

Parameters:

columns – List of variable names.

Returns:

Multivariate normal.

get_pair(x: List[str], y: List[str]) Pair

Gets a pair.

Parameters:
  • x – X variables.

  • y – Y variables.

Returns:

Pair.

get_pairs(col_pairs: List[Tuple[List[str], List[str]]] = None) List[Pair]

Gets list of pairs.

Parameters:

col_pairs – List of column pairs.

Returns:

List of pairs.

get_pairwise_columns() Generator[Tuple[List[str], List[str]], None, None]

Gets pairs of columns.

Returns:

List of pairs of columns. Each column is inside an array.

get_profile() Dict[str, Dict[str, float]]

Gets profile of variables.

Returns:

Dictionary; keys are variable names and values are summary stats.

get_score_par(cmvns: List[CondMvn]) List[Tuple[List[str], List[str], float]]

Computes the scores.

Parameters:

cmvns – List of conditional multivariate gaussian distributions.

Returns:

List of scores.

get_triplet(x: List[str], y: List[str], z: List[str]) Triplet

Gets a triplet.

Parameters:
  • x – X variables.

  • y – Y variables.

  • z – Z variables.

Returns:

Triplet.

get_triplets(col_triplets: List[Tuple[List[str], List[str], List[str]]])

Gets list of triplets.

Parameters:

col_triplets – List of column triplets.

Returns:

List of triplets.

slice_covariance(columns: List[str]) ndarray

Slices covariance matrix according to variables.

Parameters:

columns – List of variables.

Returns:

Covariance matrix.

slice_means(columns: List[str]) ndarray

Slices means vector according to variables.

Parameters:

columns – List of variables.

Returns:

List of means.

class pyspark_bbn.continuous.data.Mvn(columns: List[str], means: ndarray, cov: ndarray, profile: Dict[str, Dict[str, float]], n_samples=10)

Bases: object

Multivariate normal distribution.

__init__(columns: List[str], means: ndarray, cov: ndarray, profile: Dict[str, Dict[str, float]], n_samples=10)

Initialize the multivariate normal distribution.

Parameters:
  • columns – List of variable names.

  • means – Vector means.

  • cov – Covariance matrix.

  • profile – Dictionary of min/max for each variable.

  • n_samples – Number of samples.

get_values() Generator[Tuple[float, ...], None, None]

Gets the sampled values.

Returns:

Generator of values.

pdf(x: ndarray) float

Estimate the probability of the specified data point.

Parameters:

x – Data point.

Returns:

Probability.

class pyspark_bbn.continuous.data.Pair(X: Mvn, Y: Mvn, XY: Mvn)

Bases: object

Pair of variables.

__init__(X: Mvn, Y: Mvn, XY: Mvn)

Initialize the pair.

Parameters:
  • X – X variables.

  • Y – Y variables.

  • XY – XY variables.

get_mi() float

Computes the mutual information.

Returns:

Mutual information.

get_partial_mi(dp: ndarray) float

Computes the partial mutual information.

Parameters:

dp – Data point.

Returns:

Partial mutual information.

get_values() Generator[Tuple[float, ...], None, None]

Gets the XY values.

Returns:

Generator of XY values.

class pyspark_bbn.continuous.data.Triplet(x_cols: List[str], y_cols: List[str], z_cols: List[str], Z: Mvn, XZ: Mvn, YZ: Mvn, XYZ: Mvn)

Bases: object

Triplet variables.

__init__(x_cols: List[str], y_cols: List[str], z_cols: List[str], Z: Mvn, XZ: Mvn, YZ: Mvn, XYZ: Mvn)

Initialize the triplet.

Parameters:
  • x_cols – X columns.

  • y_cols – Y columns.

  • z_cols – Z columns.

  • Z – Z variables.

  • XZ – XZ variables.

  • YZ – YZ variables.

  • XYZ – XYZ variables.

get_cmi() float

Computes the conditional mutual information.

Returns:

Conditional mutual information.

get_partial_mi(dp: ndarray) float

Computes the partial mutual information.

Parameters:

dp – Data point.

Returns:

Partial mutual information.

get_values() Generator[Tuple[float, ...], None, None]

Gets the XYZ values.

Returns:

List of XYZ values.

Structure Learning

There are two broad classes of structure learning: constraint-based (CB) and search-and-scoring (SS). CB structure learning typically uses independence and conditional independence tests to learn the network structure. SS structure learning uses a scoring measure over a search space to find the best scoring structures.

The CB structure learning algorithms are as follows. Some of these structure learning algorithms are appropriate for learning classification models.

  • Naive Bayes: Creates a naive Bayes model (classification).

  • Tree-Augmented Network (TAN): Creates a tree-augmented network (classification).

  • BN augmented naive Bayes (BAN): A modified BAN algorithm (classification).

  • Chow-Liu (aka Maximum Weight Spanning Tree, MWST): Creates a tree structured BBN.

  • Three-Phase Dependency Analysis (TPDA): Uses TPDA (draft, thicken and thin).

  • PC Algorithm: Uses the PC algorithm.

There is only one search-and-scoring based algorithm implemented, which uses genetic algorithms.

  • Genetic Algorithm (GA): Discovers a highly scoring network structure.

Discrete, Constraint-Based

Constraint-based structure learning for discrete data.

class pyspark_bbn.discrete.scblearn.Ban(data: DiscreteData, clazz: str, cmi_threshold=0.06, method='pc')

Bases: BaseStructureLearner

Modified Bayesian network augmented naive bayes (BAN). See Bayesian Network Classifiers.

__init__(data: DiscreteData, clazz: str, cmi_threshold=0.06, method='pc')

Initialize the BAN learner.

Parameters:
  • data – Data.

  • clazz – Class variable.

  • cmi_threshold – Threshold above which variables are considered conditionally dependent.

  • method – Either pc or tpda (default=pc).

get_network() DiGraph

Learn the BAN network.

Returns:

Directed graph.

class pyspark_bbn.discrete.scblearn.BaseStructureLearner(data: DiscreteData)

Bases: object

Base structure learner.

__init__(data: DiscreteData)

Initialize the learner.

Parameters:

data – Data.

get_network() DiGraph

Gets the network structure.

class pyspark_bbn.discrete.scblearn.Mwst(data: DiscreteData, cmi_threshold=0.06)

Bases: BaseStructureLearner

Maximum weight spanning tree.

__init__(data: DiscreteData, cmi_threshold=0.06)

Initialize the MWST learner.

Parameters:
  • data – Data.

  • cmi_threshold – Threshold above which variables are considered conditionally dependent.

get_network() DiGraph

Learn the MWST-based network.

Returns:

Directed graph.

class pyspark_bbn.discrete.scblearn.Naive(data: DiscreteData, clazz: str)

Bases: BaseStructureLearner

Naive Bayesian network. The clazz variable/node is drawn with an arc to all other nodes.

__init__(data: DiscreteData, clazz: str)

Initialize the naive Bayes learner.

Parameters:
  • data – Data.

  • clazz – The clazz node.

get_network() DiGraph

Learn the naive Bayes network.

Returns:

Directed graph.

class pyspark_bbn.discrete.scblearn.Pc(data: DiscreteData, cmi_threshold=0.06)

Bases: BaseStructureLearner

PC algorithm. See A fast PC algorithm for high dimensional causal discovery with multi-core PCs.

__init__(data: DiscreteData, cmi_threshold=0.06)

Initialize the PC learner.

Parameters:
  • data – Data.

  • depth – Maximum depth.

  • cmi_threshold – Threshold above which variables are considered conditionally dependent.

get_network() DiGraph

Learn the network structure using the PC algorithm.

Returns:

Directed graph.

learn_undirected_graph() Graph

Learns an undirected graph.

Returns:

Undirected graph.

class pyspark_bbn.discrete.scblearn.Tan(data: DiscreteData, clazz: str, cmi_threshold=0.06)

Bases: Mwst

Tree-augmented network. See Comparing Bayesian Network Classifiers.

__init__(data: DiscreteData, clazz: str, cmi_threshold=0.06)

Initialize the TAN learner.

Parameters:
  • data – Data.

  • clazz – The clazz node.

  • cmi_threshold – Threshold above which variables are considered conditionally dependent.

get_network() DiGraph

Learn the TAN network.

Returns:

Directed graph.

class pyspark_bbn.discrete.scblearn.Tpda(data: DiscreteData, cmi_threshold=0.06)

Bases: Mwst

Three-phase dependency analysis (TPDA). See Learning Belief Networks from Data: An Information Theory Based Approach.

__init__(data: DiscreteData, cmi_threshold=0.06)

Initialize the TPDA learner.

Parameters:
  • data – Data.

  • cmi_threshold – Threshold above which variables are considered conditionally dependent.

static get_cmi(mi_rdd: RDD, g: Graph, mis: List[Tuple[str, str, float]], profile: Dict[str, Dict[str, int]])

Compute conditional mutual information.

Parameters:
  • mi_rdd – RDD of mutual information values.

  • g – Graph structure.

  • mis – Mutual information scores.

  • profile – Data profile.

Returns:

RDD of conditional mutual information values.

get_network() DiGraph

Learn the TPDA network.

Returns:

Directed graph.

learn_undirected_graph() Graph

Learns an undirected graph.

Returns:

Undirected graph.

Continuous, Constraint-Based

Constraint-based structure learning for continuous data.

class pyspark_bbn.continuous.scblearn.Ban(data: GaussianData, clazz: str, cmi_threshold=0.0001, method='pc')

Bases: BaseStructureLearner

Modified Bayesian network augmented naive bayes (BAN). See Bayesian Network Classifiers.

__init__(data: GaussianData, clazz: str, cmi_threshold=0.0001, method='pc')

Initialize the BAN learner.

Parameters:
  • data – Data.

  • clazz – Class variable.

  • cmi_threshold – Threshold above which variables are considered conditionally dependent.

  • method – Either pc or tpda (default=pc).

get_network() DiGraph

Learn the BAN network.

Returns:

Directed graph.

class pyspark_bbn.continuous.scblearn.BaseStructureLearner(data: GaussianData)

Bases: object

Base structure learner.

__init__(data: GaussianData)

Initialize the learner.

Parameters:

data – Data.

get_network() DiGraph

Gets the network structure.

class pyspark_bbn.continuous.scblearn.Mwst(data: GaussianData, cmi_threshold=0.01)

Bases: BaseStructureLearner

Maximum weight spanning tree.

__init__(data: GaussianData, cmi_threshold=0.01)

Initialize the MWST learner.

Parameters:
  • data – Data.

  • cmi_threshold – Threshold above which variables are considered conditionally dependent.

get_network() DiGraph

Learn the MWST-based network.

Returns:

Directed graph.

class pyspark_bbn.continuous.scblearn.Naive(data: GaussianData, clazz: str)

Bases: BaseStructureLearner

Naive Bayesian network. The clazz variable/node is drawn with an arc to all other nodes.

__init__(data: GaussianData, clazz: str)

Ctor.

Parameters:
  • data – Data.

  • clazz – The clazz node.

get_network() DiGraph

Learn the naive Bayes network.

Returns:

Directed graph.

class pyspark_bbn.continuous.scblearn.Pc(data: GaussianData, cmi_threshold=0.0001)

Bases: BaseStructureLearner

PC algorithm. See A fast PC algorithm for high dimensional causal discovery with multi-core PCs.

__init__(data: GaussianData, cmi_threshold=0.0001)

Initialize the PC learner.

Parameters:
  • data – Data.

  • depth – Maximum depth.

  • cmi_threshold – Threshold above which variables are considered conditionally dependent.

get_network() DiGraph

Learn the network structure using the PC algorithm.

Returns:

Directed graph.

learn_undirected_graph() Graph

Learns an undirected graph.

Returns:

Undirected graph.

class pyspark_bbn.continuous.scblearn.Tan(data: GaussianData, clazz: str, cmi_threshold=0.01)

Bases: Mwst

Tree-augmented network. See Comparing Bayesian Network Classifiers.

__init__(data: GaussianData, clazz: str, cmi_threshold=0.01)

Ctor.

Parameters:
  • data – Data.

  • clazz – The clazz node.

  • cmi_threshold – Threshold (equal to above which) to consider conditionally dependent.

get_network() DiGraph

Learn the TAN network.

Returns:

Directed graph.

class pyspark_bbn.continuous.scblearn.Tpda(data: GaussianData, cmi_threshold=0.006)

Bases: Mwst

Three-phase dependency analysis (TPDA). See Learning Belief Networks from Data: An Information Theory Based Approach.

__init__(data: GaussianData, cmi_threshold=0.006)

Ctor.

Parameters:
  • data – Data.

  • cmi_threshold – Threshold (equal to above which) to consider conditionally dependent.

get_network() DiGraph

Learn the TPDA network.

Returns:

Directed graph.

learn_undirected_graph() Graph

Learns an undirected graph.

Returns:

Undirected graph.

Discrete, Search-and-Scoring

Search-and-scoring structure learning for discrete data.

class pyspark_bbn.discrete.ssslearn.Ga(data: DiscreteData, sc: SparkContext, max_parents=4, mutation_rate=0.25, pop_size=100, crossover_prob=0.5, max_iters=20, convergence_threshold=3, ordering='mwst', ordering_params={'cmi_threshold': 0.06}, seed=37)

Bases: object

Uses genetic algorithm to search-and-score candidate networks. The particular algorithm is actually a hybrid approach where the ordering of nodes is induced first by a constraint-based algorithm (MWST, PC or TPDA). The ordered nodes are then used to constrain the candidate parents of each node; later nodes cannot be parents of earlier ones. See Learning Bayesian Networks: Search Methods and Experimental results.

__init__(data: DiscreteData, sc: SparkContext, max_parents=4, mutation_rate=0.25, pop_size=100, crossover_prob=0.5, max_iters=20, convergence_threshold=3, ordering='mwst', ordering_params={'cmi_threshold': 0.06}, seed=37)

Initialize the genetic algorithm learner.

Parameters:
  • data – Data.

  • sc – Spark context.

  • max_parents – Maximum number of parents (default=4).

  • mutation_rate – Mutation rate (default=0.25).

  • pop_size – Population size (default=100).

  • crossover_prob – Crossover probability (default=0.5).

  • max_iters – Maximum iterations (default=20).

  • convergence_threshold – Convergence threshold; terminate when no improvement is made after this many generations (default=3).

  • ordering – Ordering method: mwst, pc or tpda (default=mwst).

  • ordering_params – Ordering parameters to the ordering method.

  • seed – Seed for random number generation (default=37).

get_network() DiGraph

Gets the network structure.

Continuous, Search-and-Scoring

Search-and-scoring structure learning for continuous data.

class pyspark_bbn.continuous.ssslearn.Ga(data: GaussianData, sc: SparkContext, max_parents=4, mutation_rate=0.25, pop_size=100, crossover_prob=0.5, max_iters=20, convergence_threshold=3, ordering='mwst', ordering_params={'cmi_threshold': 0.0001}, seed=37)

Bases: object

Uses genetic algorithm to search-and-score candidate networks. The particular algorithm is actually a hybrid approach where the ordering of nodes is induced first by a constraint-based algorithm (MWST, PC or TPDA). The ordered nodes are then used to constrain the candidate parents of each node; later nodes cannot be parents of earlier ones. See Learning Bayesian Networks: Search Methods and Experimental results.

__init__(data: GaussianData, sc: SparkContext, max_parents=4, mutation_rate=0.25, pop_size=100, crossover_prob=0.5, max_iters=20, convergence_threshold=3, ordering='mwst', ordering_params={'cmi_threshold': 0.0001}, seed=37)

Initialize the genetic algorithm learner.

Parameters:
  • data – Data.

  • sc – Spark context.

  • max_parents – Maximum number of parents (default=4).

  • mutation_rate – Mutation rate (default=0.25).

  • pop_size – Population size (default=100).

  • crossover_prob – Crossover probability (default=0.5).

  • max_iters – Maximum iterations (default=20).

  • convergence_threshold – Convergence threshold; terminate when no improvement is made after this many generations (default=3).

  • ordering – Ordering method: mwst, pc or tpda (default=mwst).

  • ordering_params – Ordering parameters to the ordering method.

  • seed – Seed for random number generation (default=37).

get_network() DiGraph

Gets the network structure.

Parameter Learning

Parameter learning for discrete data.

class pyspark_bbn.discrete.plearn.LargeDataParamLearner(data: DiscreteData, g: DiGraph)

Bases: object

Parameter learner. Useful for when data cannot fit into memory. Parameters are learned sequentially.

__init__(data: DiscreteData, g: DiGraph)

Initialize the parameter learner for large datasets.

Parameters:

data – Data.

G:

Directed acyclic graph.

get_params() Dict[str, List[Dict[str, str | float]]]

Gets the parameters.

Returns:

Dictionary of parameters.

class pyspark_bbn.discrete.plearn.ParamLearner(data: DiscreteData, g: DiGraph)

Bases: object

Parameter learner. All parameters are learned in parallel but requires extreme Spark tuning.

__init__(data: DiscreteData, g: DiGraph)

Initialize the parameter learner.

Parameters:

data – Data.

G:

Directed acyclic graph.

get_params() Dict[str, List[Dict[str, str | float]]]

Gets the parameters.

Returns:

Dictionary of parameters.

class pyspark_bbn.discrete.plearn.SmallDataParamLearner(data: DiscreteData, g: DiGraph)

Bases: object

Parameter learner. Useful for when the data can fit into memory. Parameters are learned in parallel.

__init__(data: DiscreteData, g: DiGraph)

Initialize the parameter learner for small datasets.

Parameters:

data – Data.

G:

Directed acyclic graph.

get_params() Dict[str, List[Dict[str, str | float]]]

Gets the parameters.

Returns:

Dictionary of parameters.

pyspark_bbn.discrete.plearn.decode_row(r: Row, encodings: Dict[str, Dict[str, Dict[Any, Any]]]) Row

Decodes a row from numeric back to categorical values.

Parameters:

r – Row.

Encodings:

Dictionary of forward and backward encodings.

Returns:

Row.

pyspark_bbn.discrete.plearn.encode_data(df: DataFrame, encodings: Dict[str, Dict[str, Dict[Any, Any]]]) DataFrame

Encodes a Spark dataframe.

Param:

Dictionary of forward and backward encodings.

Returns:

Spark dataframe.

pyspark_bbn.discrete.plearn.encode_row(r: Row, encodings: Dict[str, Dict[str, Dict[Any, Any]]]) Row

Encodes a row from categorical values to numeric ones.

Parameters:

r – Row.

Encodings:

Dictionary of forward and backward encodings.

Returns:

Row.

pyspark_bbn.discrete.plearn.estimate_scikit_cpt(X_cols: List[str], y_col: str, pdf: DataFrame, encodings: Dict[str, Dict[str, Dict[Any, Any]]]) List[Dict[Any, Any]]

Estimate the CPT (Scikit).

Parameters:
  • X_cols – X columns.

  • y_col – y column.

  • pdf – Pandas dataframe.

  • encodings – Dictionary of forward and backward encodings.

Returns:

CPT.

pyspark_bbn.discrete.plearn.estimate_spark_cpt(X_cols: List[str], y_col: str, sdf: DataFrame, encodings: Dict[str, Dict[str, Dict[Any, Any]]], spark: SparkSession) Dict[str, List[Dict[Any, Any]]]

Estimate the CPT (Spark).

Parameters:
  • X_cols – X columns.

  • y_col – y column.

  • sdf – Spark dataframe.

  • encodings – Dictionary of forward and backward encodings.

  • spark – Spark session.

Returns:

CPT.

pyspark_bbn.discrete.plearn.get_encodings(sdf: DataFrame) Dict[str, Dict[str, Dict[Any, Any]]]

Gets encoding of categorical values to numeric (integer) ones.

Parameters:

sdf – Spark dataframe.

Returns:

Dictionary of forward and backward encodings.

pyspark_bbn.discrete.plearn.get_pandas_Xy(X_cols: List[str], y_col: str, pdf: DataFrame) DataFrame

Gets a Pandas dataframe in the Xy form.

Parameters:
  • X_cols – X columns.

  • y_col – y column.

  • pdf – Pandas dataframe.

Returns:

Xy Pandas dataframe.

pyspark_bbn.discrete.plearn.get_parent_child(g: DiGraph) List[Tuple[List[str], str]]

Converts the graph to a list of tuples where the first item in the tuple is a list of parents and the second item in the tuple is the child.

Parameters:

g – Directed acyclic graph.

Returns:

List of parent/child pairs.

pyspark_bbn.discrete.plearn.get_scikit_cpt(X_cols: List[str], y_col: str, m: Pipeline, encodings: Dict[str, Dict[str, Dict[Any, Any]]]) List[Dict[Any, Any]]

Gets the CPT for the specified y variable (Scikit).

Parameters:
  • X_cols – X columns.

  • y_col – y column.

  • m – Classification model.

  • encodings – Dictionary of forward and backward encodings.

Returns:

CPT.

pyspark_bbn.discrete.plearn.get_scikit_model(X: DataFrame, y: Series) Pipeline

Gets a Scikit logistic regression model.

Parameters:
  • X – Pandas dataframe.

  • y – Pandas series.

Returns:

Scikit pipeline.

pyspark_bbn.discrete.plearn.get_spark_Xy(X_cols: List[str], y_col: str, sdf: DataFrame) DataFrame

Gets a Spark dataframe in the Xy form.

Parameters:
  • X_cols – X columns.

  • y_col – y column.

  • sdf – Spark dataframe.

Returns:

Xy Spark dataframe.

pyspark_bbn.discrete.plearn.get_spark_cpt(X_cols: List[str], y_col: str, m: PipelineModel, encodings: Dict[str, Dict[str, Dict[Any, Any]]], spark: SparkSession) Dict[str, List[Dict[Any, Any]]]

Gets the CPT for the specified y variable (Spark).

Parameters:
  • X_cols – X columns.

  • y_col – y column.

  • m – Classification model.

  • encodings – Dictionary of forward and backward encodings.

  • spark – Spark session.

Returns:

CPT.

pyspark_bbn.discrete.plearn.get_spark_model(Xy: DataFrame) PipelineModel

Gets a logistic regression model.

Parameters:

Xy – Spark dataframe.

Returns:

Spark pipeline.

Utilities

Utilities to make life easier.

Discrete Utilities

pyspark_bbn.discrete.util.get_triplets(g: Graph) Iterator[Tuple[str, str, str]]

Gets all triplets (x, y, z) where x–y and y–z, but not x–z.

Parameters:

g – Undirected graph.

Returns:

List of triplets.

pyspark_bbn.discrete.util.get_triplets_rdd(g: Graph, sparkContext: SparkContext) RDD[Tuple[List[str], List[str], List[str]]]

Returns an RDD of triplets (x, y, z) where x–y and y–z but not x–z, derived in a memory-efficient way using Spark.

Parameters:
  • g – A NetworkX undirected graph.

  • sparkContext – A SparkContext.

Returns:

An RDD of triplets ([x], [z], [y]) where x–y, y–z, and not x–z.

pyspark_bbn.discrete.util.log_gamma(x: int) float

Computes log gamma(x), where gamma(x) = (x - 1)!. If x=5, then gamma(5) = (5-1)! = 4! = 4 x 3 x 2 x 1, and log(gamma(5)) = log((5-1)!) = log(4!) = log(4) + log(3) + log(2) + log(1).

Parameters:

x – Positive integer.

Returns:

Log gamma(x).

pyspark_bbn.discrete.util.log_gamma_ratio(numerator: int, denominator: int) float

Computes the ratio of gammas in log-space.

Parameters:
  • numerator – Numerator gamma.

  • denominator – Denominator gamma.

Returns:

log(gamma(numerator) / gamma(denominator)).

Discrete BBN Utilities

pyspark_bbn.discrete.bbn.get_pybbn_data(g: DiGraph, p: Dict[str, List[Dict[str, str | float]]]) Dict[str, Any]

Gets JSON data for Py-BBN.

Parameters:
  • g – Directed acyclic graph.

  • p – Parameters.

Returns:

Data.

Data Utilities

class pyspark_bbn.util.data.Discretizer(numBuckets=5, strategy='mean', relativeError=0.001)

Bases: Estimator, DefaultParamsReadable, DefaultParamsWritable

Discretizer for numeric columns in a DataFrame. This class handles missing values, scales the data, and applies quantile discretization.

__init__(numBuckets=5, strategy='mean', relativeError=0.001)

Initialize the Discretizer.

Parameters:
  • numBuckets – Number of buckets for discretization.

  • strategy – Imputation strategy (e.g., “mean”, “median”).

  • relativeError – Relative error for quantile discretization.

class pyspark_bbn.util.data.DiscretizerModel(imputerModel, scalerModel, discretizerModel, inputCols)

Bases: Transformer, DefaultParamsReadable, DefaultParamsWritable

Model for discretizing numeric columns in a DataFrame. This class handles missing values, scales the data, and applies quantile discretization.

__init__(imputerModel, scalerModel, discretizerModel, inputCols)

Initialize the DiscretizerModel.

Parameters:
  • imputerModel – The imputer model.

  • scalerModel – The scaler model.

  • discretizerModel – The discretizer model.

  • inputCols – The input columns to be discretized.

pyspark_bbn.util.data.discretize(df: DataFrame, threshold=0.8, id_col='__id__') Tuple[DataFrame, Pipeline]

Discretize the numeric columns in a DataFrame. Fields with 1 values or have mostly 1 value are dropped.

Parameters:
  • df – The input DataFrame.

  • threshold – The threshold for dropping fields. If threshold=0.8 and the dominant value is 80% or more, then it is dropped.

  • id_col – The ID column to be excluded from discretization.

Returns:

A tuple containing the discretized DataFrame and the pipeline.

pyspark_bbn.util.data.get_discretizer_model() Pipeline

Get a pipeline with a discretizer model.

Returns:

A Pipeline with a discretizer model.

pyspark_bbn.util.data.get_domains(df: DataFrame) DataFrame

Get the domains of a DataFrame.

Parameters:

df – The DataFrame to analyze.

Returns:

A DataFrame containing the domains.

pyspark_bbn.util.data.get_missingness_profile(df: DataFrame) DataFrame

Get the missingness profile of a DataFrame.

Parameters:

df – The DataFrame to analyze.

Returns:

A DataFrame containing the missingness profile.

pyspark_bbn.util.data.get_validity(v) Tuple[bool, bool, bool]

Get the validity of a value.

Parameters:

v – The value to check.

Returns:

A tuple containing three booleans:

  • is_na: True if the value is NaN or None.

  • is_inf: True if the value is infinite.

  • is_valid: True if the value is valid.

Return type:

tuple[bool, bool, bool]

pyspark_bbn.util.data.is_inf(v: Any) bool

Check if the value is infinite.

Parameters:

v – The value to check.

Returns:

1 if the value is infinite, otherwise 0.

pyspark_bbn.util.data.is_na(v: Any) bool

Check if the value is NaN or None.

Parameters:

v – The value to check.

Returns:

1 if the value is NaN or None, otherwise 0.

pyspark_bbn.util.data.is_valid(v: Any) bool

Check if the value is valid (not NaN and not infinite).

Parameters:

v – The value to check.

Returns:

1 if the value is valid, otherwise 0.

pyspark_bbn.util.data.normalize_invalids(df: DataFrame) DataFrame

Normalize invalid values in a DataFrame.

Parameters:

df – The input DataFrame.

Returns:

A DataFrame with normalized invalid values.

pyspark_bbn.util.data.vec_to_array(v)

Convert a Spark Vector to a list.

Parameters:

v – The Spark Vector to convert.

Returns:

A list representation of the Spark Vector.

pyspark_bbn.util.data.vec_to_array_udf(v)

Convert a Spark Vector to a list.

Parameters:

v – The Spark Vector to convert.

Returns:

A list representation of the Spark Vector.