diff --git a/causal_testing/__main__.py b/causal_testing/__main__.py index aee3b323..433d70bf 100644 --- a/causal_testing/__main__.py +++ b/causal_testing/__main__.py @@ -4,7 +4,10 @@ import logging import os import tempfile +import pandas as pd from pathlib import Path +from importlib.metadata import entry_points +import networkx as nx from causal_testing.testing.metamorphic_relation import generate_causal_tests @@ -20,74 +23,112 @@ def main() -> None: # Parse arguments args = parse_args() - - if args.command == Command.GENERATE: - logging.info("Generating causal tests") - generate_causal_tests( - args.dag_path, - args.output, - args.ignore_cycles, - args.threads, - effect_type=args.effect_type, - estimate_type=args.estimate_type, - estimator=args.estimator, - skip=False, - ) - logging.info("Causal test generation completed successfully") - return - # Setup logging setup_logging(args.verbose) - # Create paths object - paths = CausalTestingPaths( - dag_path=args.dag_path, - data_paths=args.data_paths, - test_config_path=args.test_config, - output_path=args.output, - ) - - # Create and setup framework - framework = CausalTestingFramework(paths, ignore_cycles=args.ignore_cycles, query=args.query) - framework.setup() - - # Load and run tests - framework.load_tests() - - if args.batch_size > 0: - logging.info(f"Running tests in batches of size {args.batch_size}") - with tempfile.TemporaryDirectory() as tmpdir: - output_files = [] - for i, results in enumerate( - framework.run_tests_in_batches( - batch_size=args.batch_size, - silent=args.silent, - adequacy=args.adequacy, - bootstrap_size=args.bootstrap_size, + match args.command: + case Command.GENERATE: + logging.info("Generating causal tests") + generate_causal_tests( + args.dag_path, + args.output, + args.ignore_cycles, + args.threads, + effect_type=args.effect_type, + estimate_type=args.estimate_type, + estimator=args.estimator, + skip=False, + ) + logging.info("Causal test generation completed successfully.") + + case Command.DISCOVER: + discover_map = {ff.name: ff for ff in entry_points(group="discovery")} + if args.technique not in discover_map: + raise ValueError( + f"Unsupported technique {args.technique}. Supported: {sorted(discover_map)}. " + "If you have implemented a custom technique, you will need to add this to your entrypoints via your " + "pyproject.toml file." ) - ): - temp_file_path = os.path.join(tmpdir, f"output_{i}.json") - framework.save_results(results, temp_file_path) - output_files.append(temp_file_path) - del results - - # Now stitch the results together from the temporary files - all_results = [] - for file_path in output_files: - with open(file_path, "r", encoding="utf-8") as f: - all_results.extend(json.load(f)) - - output_path = Path(args.output) - output_path.parent.mkdir(parents=True, exist_ok=True) - - with open(args.output, "w", encoding="utf-8") as f: - json.dump(all_results, f, indent=4) - else: - logging.info("Running tests in regular mode") - results = framework.run_tests(silent=args.silent, adequacy=args.adequacy, bootstrap_size=args.bootstrap_size) - framework.save_results(results) - - logging.info("Causal testing completed successfully.") + kwargs = {} + for argument in args.technique_kwargs: + split = argument.split("=") + if len(split) != 2: + raise ValueError(f"Malformed argument {argument}. Should be specified as `arg_name=arg_value`") + kwargs[split[0]] = split[1] + + logging.info("Discovering causal structure") + # Need to reset index to allow for multiple files having the same index (i.e. starting at zero). + # Otherwise you end up with duplicate indices, which causes problems further down the line + df = pd.concat([pd.read_csv(path) for path in args.data_paths]).reset_index() + if args.variables: + df = df[args.variables] + + discover_class = discover_map[args.technique].load() + discover = discover_class( + df=df, + exclude_edges=( + list(nx.nx_pydot.read_dot(args.exclude_edges).edges()) if args.exclude_edges is not None else [] + ), + include_edges=( + list(nx.nx_pydot.read_dot(args.include_edges).edges()) if args.include_edges is not None else [] + ), + **kwargs, + ) + evolved_dag = discover.discover() + discover.write_dot(evolved_dag, args.output) + logging.info("Causal structure discovery completed successfully.") + case Command.TEST: + # Create paths object + paths = CausalTestingPaths( + dag_path=args.dag_path, + data_paths=args.data_paths, + test_config_path=args.test_config, + output_path=args.output, + ) + + # Create and setup framework + framework = CausalTestingFramework(paths, ignore_cycles=args.ignore_cycles, query=args.query) + framework.setup() + + # Load and run tests + framework.load_tests() + + if args.batch_size > 0: + logging.info(f"Running tests in batches of size {args.batch_size}") + with tempfile.TemporaryDirectory() as tmpdir: + output_files = [] + for i, results in enumerate( + framework.run_tests_in_batches( + batch_size=args.batch_size, + silent=args.silent, + adequacy=args.adequacy, + bootstrap_size=args.bootstrap_size, + ) + ): + temp_file_path = os.path.join(tmpdir, f"output_{i}.json") + framework.save_results(results, temp_file_path) + output_files.append(temp_file_path) + del results + + # Now stitch the results together from the temporary files + all_results = [] + for file_path in output_files: + with open(file_path, "r", encoding="utf-8") as f: + all_results.extend(json.load(f)) + + output_path = Path(args.output) + output_path.parent.mkdir(parents=True, exist_ok=True) + + with open(args.output, "w", encoding="utf-8") as f: + json.dump(all_results, f, indent=4) + else: + logging.info("Running tests in regular mode") + results = framework.run_tests( + silent=args.silent, adequacy=args.adequacy, bootstrap_size=args.bootstrap_size + ) + framework.save_results(results) + + logging.info("Causal testing completed successfully.") if __name__ == "__main__": diff --git a/causal_testing/discovery/__init__.py b/causal_testing/discovery/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/causal_testing/discovery/abstract_discovery.py b/causal_testing/discovery/abstract_discovery.py new file mode 100644 index 00000000..597ce8ec --- /dev/null +++ b/causal_testing/discovery/abstract_discovery.py @@ -0,0 +1,221 @@ +""" +This module implements the abstract Discovery class to infer causal DAGs from data. +""" + +from abc import ABC, abstractmethod +import random +from itertools import permutations +from enum import Enum +import re +import warnings + +import networkx as nx +import rustworkx as rx +import pandas as pd + +from causal_testing.specification.causal_dag import CausalDAG +from causal_testing.testing.causal_test_result import CausalTestResult +from causal_testing.testing.causal_effect import Positive, Negative +from causal_testing.main import CausalTestingFramework +from causal_testing.specification.scenario import Scenario +from causal_testing.testing.metamorphic_relation import generate_metamorphic_relations + +TestResult = Enum("TestResult", [("PASS", 2), ("FAIL", 0), ("INESTIMABLE", 1)]) + +# Ignore warnings from statsmodels when we try to evaluate test cases +warnings.simplefilter("ignore") + + +def simple_cycle(causal_dag: CausalDAG): + """ + Find a cycle in the given CausalDAG, if one exists, returns the first found. + + :param causal_dag: The CausalDAG to check for cycles. + :returns: A list of edges in the cycle, or an empty list if there are no cycles. + """ + rx_graph = rx.networkx_converter(causal_dag) + return [(rx_graph[i], rx_graph[j]) for i, j in rx.digraph_find_cycle(rx_graph)] + + +def effect_direction(result: CausalTestResult) -> str: + """ + Check whether the estimated causal effect is negative or positive. + + :param result: The causal test result object. + :returns: Whether the estimated causal test is positive or negative (or no effect). + """ + if pd.api.types.is_numeric_dtype( + result.estimator.df[result.estimator.base_test_case.treatment_variable.name] + ) and pd.api.types.is_numeric_dtype(result.estimator.df[result.estimator.base_test_case.outcome_variable.name]): + if Negative().apply(result): + return "negative" + if Positive().apply(result): + return "positive" + return None + + +def is_match(u: str, v: str, patterns: list[str]): + """ + Check whether a given edge matches a given pattern. + + :param u: The origin node of the edge. + :param v: The destination node of the edge. + :param patterns: A list of tuples containing the patterns to check against. + :returns: True if the edge matches the pattern, False otherwise. + """ + return any(re.fullmatch(u, pat_u) and re.fullmatch(v, pat_v) for pat_u, pat_v in patterns) + + +class Discovery(ABC): + """ + Abstract class for causal discovery. + """ + + def __init__( + self, + df: pd.DataFrame, + random_seed: int = 0, + excluded_ptns: str = None, + included_ptns: str = None, + ): + random.seed(random_seed) + self.df = df + self.random_seed = random_seed + + possible_edges = [] + included_edges = [] + excluded_edges = [] + + for u, v in permutations(df.columns, 2): + if is_match(u, v, excluded_ptns): + excluded_edges.append((u, v)) + else: + possible_edges.append((u, v)) + + if included_ptns and is_match(u, v, included_ptns): + included_edges.append((u, v)) + + initial_dag = CausalDAG() + initial_dag.add_edges_from(included_edges) + if not initial_dag.is_acyclic(): + raise ValueError( + "Specified included edges include a cycle, making it impossible to infer a DAG. " + "Please resolve this and try again." + ) + + self.possible_edges = sorted(possible_edges) + self.included_edges = sorted(included_edges) + self.excluded_edges = sorted(excluded_edges) + + @abstractmethod + def discover(self) -> CausalDAG: + """ + Discover the causal DAG. + + :returns: The inferred causal DAG. + """ + + def remove_cycles(self, causal_dag: CausalDAG): + """ + Remove cycles from individuals by iteratively deleting a random edge from each cycle until there are no more + cycles. + + :param causal_dag: The CausalDAG to be repaired. + """ + nodes = causal_dag.nodes + cycle = simple_cycle(causal_dag) + while cycle: + idx = random.choice(range(len(cycle))) + while cycle[idx] in self.included_edges: + idx = (idx + 1) % len(cycle) + causal_dag.remove_edge(cycle[idx][0], cycle[idx][1]) + cycle = simple_cycle(causal_dag) + causal_dag.add_nodes_from(nodes) + + def write_dot(self, individual: CausalDAG, output_file: str): + """ + Write the given individual to the given output file. + + :param individual: The causal DAG to output. + :param output_file: The name of the file to write to. + """ + if hasattr(individual, "test_results"): + print(individual.test_results) + for _, test in individual.test_results.iterrows(): + if (test["treatment"], test["outcome"]) in individual.edges: + if test["result"] == TestResult.PASS: + individual[test["treatment"]][test["outcome"]]["color"] = "green" + elif test["result"] == TestResult.INESTIMABLE: + individual[test["treatment"]][test["outcome"]]["color"] = "orange" + elif test["result"] == TestResult.FAIL: + individual[test["treatment"]][test["outcome"]]["color"] = "red" + else: + raise ValueError(f"Invalid test outcome {test['result']}") + else: + individual.add_edge(test["treatment"], test["outcome"], ignore_cycles=True) + individual[test["treatment"]][test["outcome"]]["style"] = "dashed" + if test["result"] == TestResult.PASS: + individual[test["treatment"]][test["outcome"]]["style"] = "invis" + individual[test["treatment"]][test["outcome"]]["constraint"] = False + elif test["result"] == TestResult.INESTIMABLE: + individual[test["treatment"]][test["outcome"]]["color"] = "orange" + elif test["result"] == TestResult.FAIL: + individual[test["treatment"]][test["outcome"]]["color"] = "red" + else: + raise ValueError(f"Invalid test outcome {test['result']}") + + nx.drawing.nx_pydot.write_dot(individual, output_file) + + def evaluate_tests(self, causal_dag: CausalDAG) -> pd.DataFrame: + """ + Generate and evaluate causal test cases from the supplied CausalDAG and return a list of edges for which the + corresponding causal test case failed. + These results are then assigned to a new attribute `test_results` within the individual for later reuse. + + :param causal_dag: The CausalDAG to evaluate. + :returns: Pandas dataframe with test outcome details + (result, expected effect, treatment, outcome, effect direction). + """ + + ctf = CausalTestingFramework(None) + ctf.dag = causal_dag + ctf.data = self.df + ctf.create_variables() + ctf.scenario = Scenario(list(ctf.variables["inputs"].values()) + list(ctf.variables["outputs"].values())) + ctf.test_cases = ctf.create_test_cases( + { + "tests": [ + relation.to_json_stub( + estimator="LogisticRegressionEstimator", estimate_type="unit_odds_ratio", alpha=0.01 + ) + for relation in generate_metamorphic_relations(causal_dag) + ] + } + ) + results = [] + + for test_case, result in zip(ctf.test_cases, ctf.run_tests(silent=True)): + if result.effect_estimate is None: + results.append( + { + "result": TestResult.INESTIMABLE, + "expected_effect": test_case.expected_causal_effect.__class__.__name__, + "treatment": test_case.base_test_case.treatment_variable.name, + "outcome": test_case.base_test_case.outcome_variable.name, + } + ) + else: + results.append( + { + "result": ( + TestResult.PASS if test_case.expected_causal_effect.apply(result) else TestResult.FAIL + ), + "expected_effect": test_case.expected_causal_effect.__class__.__name__, + "treatment": test_case.base_test_case.treatment_variable.name, + "outcome": test_case.base_test_case.outcome_variable.name, + "effect": effect_direction(result), + } + ) + + causal_dag.test_results = pd.DataFrame(results) + return pd.DataFrame(results) diff --git a/causal_testing/discovery/hill_climber_discovery.py b/causal_testing/discovery/hill_climber_discovery.py new file mode 100644 index 00000000..c95d4288 --- /dev/null +++ b/causal_testing/discovery/hill_climber_discovery.py @@ -0,0 +1,142 @@ +""" +This module implements a hill climbing algorithm to optimise causal DAGs based on the tests that pass/fail. +""" + +import random +import time + +import numpy as np +import pandas as pd + +from causal_testing.specification.causal_dag import CausalDAG +from causal_testing.discovery.abstract_discovery import Discovery, TestResult + + +class HillClimberDiscovery(Discovery): + """ + Simple hill climber evolution of cauasl DAGs via 1+1EA. + Attempts to maximise the number of passing tests and minimise the number of failing tests. + """ + + def __init__( # pylint: disable=R0917 + self, + df: pd.DataFrame, + random_seed: int = 0, + include_edges: str = None, + exclude_edges: str = None, + max_iterations: int = 100, + max_iterations_without_improvement: int = 10, + ): + super().__init__(df, random_seed, include_edges, exclude_edges) + self.max_iterations = int(max_iterations) + self.max_iterations_without_improvement = int(max_iterations_without_improvement) + + def sum_test_outcomes(self, test_results: pd.DataFrame) -> dict: + """ + Aggregate the number of passing, failing, and inestimable tests + :param test_results: Dataframe containing the raw pass/fail/inestimable outcome of each test case. + :returns: Dictionary containing the number of pass/fail/inestimable outcomes. + """ + counts = pd.concat( + [ + pd.DataFrame(np.sort(test_results[["treatment", "outcome"]], axis=1), columns=["treatment", "outcome"]), + pd.get_dummies(test_results["result"]).astype(int), + ], + axis=1, + ) + # Ensure every column is initialised - Test outcomes that never occurred won't be in the dataframe otherwise + for col in TestResult: + if col not in counts.columns: + counts[col] = 0 + counts = counts.groupby(["treatment", "outcome"]).sum().reset_index()[list(TestResult)] + # The below line normalises by the number of tests *for each edge* + # Independence tests X _||_ Y get two tests (X _||_ Y and Y _||_ X) because we don't know which way the + # causality flows. We need to normalise this (e.g. if X _||_ Y and Y _||_ X both pass, then the score should be + # 1 rather than 2) otherwise we end up unintentionally optimising for more independences. + counts = counts.apply(lambda col: col / counts.sum(axis=1)) + + return counts.sum(axis=0).to_dict() + + def evaluate_fitness( + self, + individual: CausalDAG, + ) -> tuple[tuple[float, float, float], list[tuple[str, str]]]: + """ + Evaluate the fitness of a given causal DAG by evaluating the corresponding test cases using a tier based + fitness metric. + lexicographical order (max pass, minimise failure, minimise unknown) + e.g. (X pass, Y fail, Z+1 unknown) is better than (X pass, Y+1 fail, Z unknown) + + + :param individual: The candidate individual to evaluate. + :returns: Tuple of the form (X, Y), where X is a triple containing the number of passing, failing, and + inestimable tests respectively, and Y is a list of failing edges. + """ + self.evaluate_tests(individual) + counts = self.sum_test_outcomes(individual.test_results) + + problem_tests = individual.test_results.loc[individual.test_results["result"] != TestResult.PASS] + problem_edges = problem_tests[["treatment", "outcome"]].apply(tuple, axis=1).tolist() + problem_edges.extend( + problem_tests.query("expected_effect == 'NoEffect'")[["outcome", "treatment"]].apply(tuple, axis=1).tolist() + ) + + fitness_values = ( + counts.get(TestResult.PASS, 0), + -counts.get(TestResult.FAIL, 0), + -counts.get(TestResult.INESTIMABLE, 0), + ) + return fitness_values, problem_edges + + def discover(self) -> CausalDAG: + """ + Discover the causal DAG. + + :returns: The inferred causal DAG. + """ + + start_time = time.time() + individual = CausalDAG() + individual.add_nodes_from(self.df.columns) + individual.add_edges_from(self.possible_edges) + self.remove_cycles(individual) + fitness_values, problem_edges = self.evaluate_fitness(individual) + + iterations = self.max_iterations + iterations_without_improvement = 0 + + while problem_edges and iterations and iterations_without_improvement < self.max_iterations_without_improvement: + iterations -= 1 + + new_individual = individual.copy() + for origin, dest in random.sample( + # If we've gone over the maximum iterations without improvement + problem_edges + + ( + self.possible_edges + if iterations_without_improvement > self.max_iterations_without_improvement + else [] + ), + random.randint(1, len(problem_edges)), + ): + if new_individual.has_edge(origin, dest) and (origin, dest) not in self.included_edges: + new_individual.remove_edge(origin, dest) + elif not new_individual.has_edge(origin, dest) and (origin, dest) not in self.excluded_edges: + # Want to bypass the cycle check of CausalDAG as we remove the cycles afterwards + new_individual.add_edge(origin, dest, ignore_cycles=True) + self.remove_cycles(new_individual) + new_fitness_values, new_problem_edges = self.evaluate_fitness(new_individual) + + if new_fitness_values > fitness_values: + fitness_values = new_fitness_values + problem_edges = new_problem_edges + individual = new_individual + iterations_without_improvement = 0 + else: + iterations_without_improvement += 1 + + end_time = time.time() + individual.graph["fitness"] = fitness_values + individual.graph["time"] = round(end_time - start_time) + + return individual diff --git a/causal_testing/discovery/nsga_discovery.py b/causal_testing/discovery/nsga_discovery.py new file mode 100644 index 00000000..a0188539 --- /dev/null +++ b/causal_testing/discovery/nsga_discovery.py @@ -0,0 +1,124 @@ +import numpy as np +import pygad +import pandas as pd + +from causal_testing.specification.causal_dag import CausalDAG +from causal_testing.discovery.abstract_discovery import Discovery + + +class NSGADiscovery(Discovery): + """ + Multiobjective evolution of cauasl DAGs via NSGA2. + Attempts to optimise the number of passing tests, where each possible relationship is a "feature". + """ + + def __init__( # pylint: disable=R0917 + self, + df: pd.DataFrame, + random_seed: int = 0, + include_edges: str = None, + exclude_edges: str = None, + max_iterations: int = 100, + num_parents_mating=2, + population_size=5, # Population size + ): + super().__init__(df, random_seed, include_edges, exclude_edges) + self.max_iterations = int(max_iterations) + self.num_parents_mating = num_parents_mating + self.sol_per_pop = population_size + + def binary_string_to_causal_dag(self, individual: np.array) -> CausalDAG: + """ + Converts a binary string representation of a causal DAG to a CausalDAG object. + + :param individual: Bitstring of the same length as `possible_edges` such that 1 at position `i` represents + possible_edges[i] being an edge in the graph and 0 represents it not being. + :returns: The converted CausalDAG instance. + """ + causal_dag = CausalDAG() + origins, destinations = zip(*self.possible_edges) + causal_dag.add_nodes_from(set(origins).union(set(destinations))) + causal_dag.add_edges_from([edge for edge, add in zip(self.possible_edges, individual) if add]) + return causal_dag + + def causal_dag_to_binary_string(self, causal_dag: CausalDAG) -> np.array: + """ + Converts a CausalDAG to a binary string representation. + + :param causal_dag: The CausalDAG to convert. + :returns: The converted binary string such that 1 at position `i` represents possible_edges[i] being an edge in + the graph and 0 represents it not being. + """ + return np.array([int(edge in causal_dag.edges) for edge in self.possible_edges]) + + def multi_objective_fitness( + self, + ga_instance: pygad.GA, # pylint: disable=W0613 + individual: np.array, + individual_inx: int, # pylint: disable=W0613 + ) -> np.array: + """ + Remove cycles and calculate the multi-objective fitness of the resulting causal DAG in terms of tests passing + failing, and being inestimable. + NOTE: this is in terms of the number of possible (X, Y) *relationships* rather than edges, so is not + order dependent. I.e. (X, Y) and (Y, X) are the same. This stops the algorithm optimising for independences, + which get two tests (one in each direction). + + :param ga_instance: The calling GA instance. NOT USED - required for compatibility. + :param individual: The individual to evaluate. + :param individual_inx: The index of the individual in the population. NOT USED - required for compatibility. + :returns: Numeric numpy array representing the outcome of each test. + """ + # Repair by removing cycles + causal_dag = self.binary_string_to_causal_dag(individual) + self.remove_cycles(causal_dag) + individual[:] = self.causal_dag_to_binary_string(causal_dag) + + test_results = self.evaluate_tests(causal_dag) + test_results["result"] = test_results["result"].apply(lambda x: x.value).astype(float) + + # Sort so that the "treatment" is always the first item alphabetically + # This ensures that the fitness vector always represents the same features + test_results[["treatment", "outcome"]] = np.sort(test_results[["treatment", "outcome"]], axis=1) + # We then need to normalise each potential edge since independences get two tests (X _||_ Y and Y _||_ X) + # because we don't know which way the causality flows. + # "groupby" will sort by (treatment, outcome) + result = test_results.groupby(["treatment", "outcome"], sort=True)["result"].mean() + + return result.values + + def discover(self) -> CausalDAG: + """ + Discover the causal DAG. + + :returns: The inferred causal DAG. + """ + + gene_space = [] + for edge in self.possible_edges: + # Excluded edges are not in possible_edges, so no need to explicitly test for this + if edge in self.included_edges: + # Must be included + gene_space.append([1]) + else: + # Free to evolve + gene_space.append([0, 1]) + + ga_instance = pygad.GA( + num_generations=self.max_iterations, + num_parents_mating=self.num_parents_mating, + sol_per_pop=self.sol_per_pop, # Population size + num_genes=len(self.possible_edges), + gene_space=gene_space, + gene_type=int, + fitness_func=self.multi_objective_fitness, + parent_selection_type="nsga2", + random_seed=self.random_seed, + ) + ga_instance.run() + + best_solution, _, _ = ga_instance.best_solution() + best_individual = self.binary_string_to_causal_dag(best_solution) + self.evaluate_tests(best_individual) + + return best_individual diff --git a/causal_testing/estimation/linear_regression_estimator.py b/causal_testing/estimation/linear_regression_estimator.py index 16a41075..1f418358 100644 --- a/causal_testing/estimation/linear_regression_estimator.py +++ b/causal_testing/estimation/linear_regression_estimator.py @@ -98,7 +98,7 @@ def estimate_coefficient(self) -> EffectEstimate: """Estimate the unit average treatment effect of the treatment on the outcome. That is, the change in outcome caused by a unit change in treatment. - :return: The unit average treatment effect and the 95% Wald confidence intervals. + :return: The unit average treatment effect and the Wald confidence intervals. """ model = self.fit_model() newline = "\n" @@ -129,7 +129,7 @@ def estimate_ate(self) -> EffectEstimate: """Estimate the average treatment effect of the treatment on the outcome. That is, the change in outcome caused by changing the treatment variable from the control value to the treatment value. - :return: The average treatment effect and the 95% Wald confidence intervals. + :return: The average treatment effect and the Wald confidence intervals. """ model = self.fit_model() @@ -155,7 +155,7 @@ def estimate_risk_ratio(self, adjustment_config: dict = None) -> EffectEstimate: """Estimate the risk_ratio effect of the treatment on the outcome. That is, the change in outcome caused by changing the treatment variable from the control value to the treatment value. - :return: The average treatment effect and the 95% Wald confidence intervals. + :return: The average treatment effect and the Wald confidence intervals. """ prediction = self._predict(adjustment_config=adjustment_config) control_outcome, treatment_outcome = prediction.iloc[1], prediction.iloc[0] @@ -175,7 +175,7 @@ def estimate_ate_calculated(self, adjustment_config: dict = None) -> EffectEstim their values. N.B. Every variable in the adjustment set MUST have a value in order to estimate the outcome under control and treatment. - :return: The average treatment effect and the 95% Wald confidence intervals. + :return: The average treatment effect and the Wald confidence intervals. """ prediction = self._predict(adjustment_config=adjustment_config) control_outcome, treatment_outcome = prediction.iloc[1], prediction.iloc[0] diff --git a/causal_testing/estimation/logistic_regression_estimator.py b/causal_testing/estimation/logistic_regression_estimator.py index d6bfc5db..fcbb00eb 100644 --- a/causal_testing/estimation/logistic_regression_estimator.py +++ b/causal_testing/estimation/logistic_regression_estimator.py @@ -37,7 +37,7 @@ def estimate_unit_odds_ratio(self) -> EffectEstimate: """Estimate the odds ratio of increasing the treatment by one. In logistic regression, this corresponds to the coefficient of the treatment of interest. - :return: The odds ratio. Confidence intervals are not yet supported. + :return: The odds ratio with confidence intervals. """ model = self.fit_model(self.df) diff --git a/causal_testing/main.py b/causal_testing/main.py index 9fa3fd55..0f9b2568 100644 --- a/causal_testing/main.py +++ b/causal_testing/main.py @@ -31,6 +31,7 @@ class Command(Enum): TEST = "test" GENERATE = "generate" + DISCOVER = "discover" @dataclass @@ -48,6 +49,8 @@ class CausalTestingPaths: data_paths: List[Path] test_config_path: Path output_path: Path + include_edges_path: Optional[Path] = None + exclude_edges_path: Optional[Path] = None def __init__( self, @@ -55,11 +58,15 @@ def __init__( data_paths: List[Union[str, Path]], test_config_path: Union[str, Path], output_path: Union[str, Path], + include_edges_path: Optional[Union[str, Path]] = None, + exclude_edges_path: Optional[Union[str, Path]] = None, ): self.dag_path = Path(dag_path) self.data_paths = [Path(p) for p in data_paths] self.test_config_path = Path(test_config_path) self.output_path = Path(output_path) + self.include_edges_path = Path(include_edges_path) if include_edges_path else None + self.exclude_edges_path = Path(exclude_edges_path) if exclude_edges_path else None def validate_paths(self) -> None: """ @@ -81,6 +88,12 @@ def validate_paths(self) -> None: if not self.output_path.parent.exists(): self.output_path.parent.mkdir(parents=True) + if self.include_edges_path and not self.include_edges_path.exists(): + raise FileNotFoundError(f"Data file not found: {self.include_edges_path}") + + if self.exclude_edges_path and not self.exclude_edges_path.exists(): + raise FileNotFoundError(f"Data file not found: {self.exclude_edges_path}") + class CausalTestingFramework: # pylint: disable=too-many-instance-attributes @@ -567,6 +580,40 @@ def parse_args(args: Optional[Sequence[str]] = None) -> argparse.Namespace: help="Run tests in batches of the specified size (default: 0, which means no batching)", ) + # Discovery + parser_discover = subparsers.add_parser(Command.DISCOVER.value, help="Discover causal structures from data") + parser_discover.add_argument("-d", "--data-paths", help="Paths to data files (.csv)", nargs="+", required=True) + parser_discover.add_argument( + "-t", + "--technique", + help="The name of the technique to use. Currently supported are 'HillClimberDiscovery' and 'NSGADiscovery'", + required=True, + ) + parser_discover.add_argument( + "-V", + "--variables", + help="The subset of variables from the data to consider. Defaults to all.", + nargs="*", + default=[], + ) + parser_discover.add_argument("-o", "--output", help="Path for output DAG file (.dot)", required=True) + parser_discover.add_argument( + "-i", "--include-edges", help="Path to file containing edges to include", required=False + ) + parser_discover.add_argument( + "-e", "--exclude-edges", help="Path to file containing edges to exclude", required=False + ) + parser_discover.add_argument( + "-s", "--fitness-score", help="Use fitness score instead of tiered fitness", action="store_true", default=False + ) + parser_discover.add_argument( + "--technique-kwargs", + help="Keywords for the discovery technique. These should be specified as `arg1=value1 arg2=value2...`.", + nargs="*", + default=[], + ) + parser_discover.add_argument("-v", "--verbose", help="Enable verbose logging", action="store_true", default=False) + args = main_parser.parse_args(args) # Assume the user wants test adequacy if they're setting bootstrap_size diff --git a/causal_testing/specification/causal_dag.py b/causal_testing/specification/causal_dag.py index cc79cb4e..35b9c84b 100644 --- a/causal_testing/specification/causal_dag.py +++ b/causal_testing/specification/causal_dag.py @@ -172,17 +172,18 @@ def check_iv_assumptions(self, treatment, outcome, instrument) -> bool: raise ValueError(f"Instrument {instrument} and outcome {outcome} share common causes") return True - def add_edge(self, u_of_edge: Node, v_of_edge: Node, **attr): + def add_edge(self, u_of_edge: Node, v_of_edge: Node, ignore_cycles: bool = False, **attr): """Add an edge to the causal DAG. Overrides the default networkx method to prevent users from adding a cycle. - :param u_of_edge: From node - :param v_of_edge: To node + :param u_of_edge: Origin node + :param v_of_edge: Destination node + :param ignore_cycles: Whether to ignore cycles that adding the new edge may have introduced. :param attr: Attributes """ super().add_edge(u_of_edge, v_of_edge, **attr) - if not self.is_acyclic(): + if not ignore_cycles and not self.is_acyclic(): raise nx.HasACycle("Invalid Causal DAG: contains a cycle.") def cycle_nodes(self) -> list: diff --git a/causal_testing/testing/causal_effect.py b/causal_testing/testing/causal_effect.py index 898b1fef..8eb3f26c 100644 --- a/causal_testing/testing/causal_effect.py +++ b/causal_testing/testing/causal_effect.py @@ -127,7 +127,7 @@ def apply(self, res: CausalTestResult) -> bool: raise ValueError("Positive Effects are currently only supported on single float datatypes") if res.effect_estimate.type in {"ate", "coefficient"}: return bool(res.effect_estimate.value[0] > 0) - if res.effect_estimate.type == "risk_ratio": + if res.effect_estimate.type in ["risk_ratio", "unit_odds_ratio"]: return bool(res.effect_estimate.value[0] > 1) raise ValueError(f"Test Value type {res.effect_estimate.type} is not valid for this CausalEffect") @@ -141,7 +141,7 @@ def apply(self, res: CausalTestResult) -> bool: raise ValueError("Negative Effects are currently only supported on single float datatypes") if res.effect_estimate.type in {"ate", "coefficient"}: return bool(res.effect_estimate.value[0] < 0) - if res.effect_estimate.type == "risk_ratio": + if res.effect_estimate.type in ["risk_ratio", "unit_odds_ratio"]: return bool(res.effect_estimate.value[0] < 1) # Dead code but necessary for pylint raise ValueError(f"Test Value type {res.effect_estimate.type} is not valid for this CausalEffect") diff --git a/docs/source/index.rst b/docs/source/index.rst index bd7a8b51..e0536a98 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -43,6 +43,7 @@ If you have any questions about our framework, you can also reach us by `email < /modules/estimators /modules/custom_estimators /modules/causal_testing + /modules/discovery .. toctree:: :maxdepth: 2 diff --git a/docs/source/modules/discovery.rst b/docs/source/modules/discovery.rst new file mode 100644 index 00000000..2277cad0 --- /dev/null +++ b/docs/source/modules/discovery.rst @@ -0,0 +1,41 @@ +================ +Causal Discovery +================ + +The Causal Discovery tool generates a directed acyclic graph (DAG) that represents the causal relationships between +variables in your input dataset(s). This generated DAG can then serve as the foundational causal specification for +your causal model. + +.. note:: + Automated causal discovery is a starting point. The resulting DAG must always be manually inspected to ensure it + is a valid representation of your system. + +Configuration Options +--------------------- +The tool supports various configurations to tailor the discovery process to your specific dataset and domain knowledge: + +* **Fitness Functions** Choose between score-based and multi-objective fitness functions. + +* **Search Constraints (Iterations)** + Adjust how long the algorithm runs, which is particularly useful for datasets with a large number of variables. + + * *Default limits:* 100 maximum iterations, and 20 maximum iterations without improvement. + +* **Domain Knowledge (Edge Constraints)** + You can explicitly include or exclude specific edges in the output DAG using dot files. Regular expressions (regex) + are supported. + + **Example:** Consider the DAG for the `vaccinating the elderly + `_ + modelling scenario. This scenario has two inputs: ``vaccine`` and ``max_doses``. + + .. container:: zoom-container + + .. image:: ../../../examples/covasim_/vaccinating_elderly/dag.png + :class: zoomable-image + :alt: Causal DAG of the vaccinating the elderly modelling scenario + + * **Excluding Edges:** If domain knowledge dictates that ``max_doses`` has no causal effect on any outputs, you + can specify ``max_doses -> ".*"`` and ``".*" -> max_doses`` in the *exclude edges* dot file. + * **Including Edges:** If it is known that ``vaccine`` directly affects all three outputs, you can specify + ``vaccine -> "cum_.*"`` in the *include edges* dot file. \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 7ac068b9..96d73b00 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -15,21 +15,22 @@ requires-python = ">=3.11,<3.15" license = { text = "MIT" } keywords = ["causal inference","verification"] dependencies = [ -"lifelines", -"networkx>=3.4,<3.7", -"numpy>=1.26.0,<=2.4.4", -"pandas>=2.1", -"scikit_learn~=1.4", -"scipy>=1.12.0,<=1.17.1", -"statsmodels~=0.14", -"tabulate~=0.9", -"pydot~=2.0", -"pygad~=3.3", -"deap~=1.4.1", -"sympy~=1.14.0", -"pyarrow>=19.0.1,<25", -"fastparquet>=2024.11.0", -"tqdm~=4.67.1" + "lifelines", + "networkx>=3.4,<3.7", + "numpy>=1.26.0,<=2.4.4", + "pandas>=2.1", + "scikit_learn~=1.4", + "scipy>=1.12.0,<=1.17.1", + "statsmodels~=0.14", + "tabulate~=0.9", + "pydot~=2.0", + "pygad~=3.3", + "deap~=1.4.1", + "sympy~=1.14.0", + "pyarrow>=19.0.1,<25", + "fastparquet>=2024.11.0", + "tqdm~=4.67.1", + "rustworkx>=0.18.0", ] dynamic = ["version"] @@ -80,6 +81,10 @@ Positive = "causal_testing.testing.causal_effect:Positive" Negative = "causal_testing.testing.causal_effect:Negative" ExactValue = "causal_testing.testing.causal_effect:ExactValue" +[project.entry-points."discovery"] +HillClimberDiscovery = "causal_testing.discovery.hill_climber_discovery:HillClimberDiscovery" +NSGADiscovery = "causal_testing.discovery.nsga_discovery:NSGADiscovery" + [tool.setuptools.packages] find = {} diff --git a/tests/discovery_tests/test_hill_climber.py b/tests/discovery_tests/test_hill_climber.py new file mode 100644 index 00000000..48a93c90 --- /dev/null +++ b/tests/discovery_tests/test_hill_climber.py @@ -0,0 +1,87 @@ +import unittest + +from causal_testing.discovery.hill_climber import simple_cycle, remove_cycles +from causal_testing.specification.causal_dag import CausalDAG + + +class TestHillClimber(unittest.TestCase): + + def setUp(self): + pass + + def test_simple_cycle_normal(self): + dag = CausalDAG() + dag.add_edges_from([("A", "B"), ("B", "C")]) + dag.add_edge("C", "A", ignore_cycles=True) + + cycle = simple_cycle(dag) + self.assertEqual(set(cycle), {("A", "B"), ("B", "C"), ("C", "A")}) + + def test_simple_cycle_no_cycles(self): + dag = CausalDAG() + dag.add_edges_from([("A", "B"), ("B", "C")]) + + cycle = simple_cycle(dag) + self.assertEqual(set(cycle), set()) + + def test_remove_cycles_normal(self): + dag = CausalDAG() + dag.add_nodes_from(["A", "B", "C"]) + dag.add_edges_from([("A", "B"), ("B", "C")]) + dag.add_edge("C", "A", ignore_cycles=True) + + remove_cycles(dag, included_edges=set()) + self.assertEqual(len(dag.edges()), 2) + + def test_remove_cycles_respects_included_edges(self): + dag = CausalDAG() + dag.add_nodes_from(["A", "B", "C"]) + dag.add_edges_from([("A", "B"), ("B", "C")]) + dag.add_edge("C", "A", ignore_cycles=True) + + included_edges = {("A", "B"), ("B", "C")} + remove_cycles(dag, included_edges) + self.assertFalse(dag.has_edge("C", "A")) + + def test_remove_cycles_no_cycles_present(self): + dag = CausalDAG() + dag.add_nodes_from(["A", "B"]) + dag.add_edges_from([("A", "B")]) + + remove_cycles(dag, included_edges=set()) + self.assertEqual(len(dag.edges()), 1) + + def test_remove_cycles_multiple_cycles(self): + dag = CausalDAG() + dag.add_nodes_from(["A", "B", "C", "D"]) + dag.add_edges_from([("A", "B"), ("C", "D")]) + dag.add_edge("B", "A", ignore_cycles=True) + dag.add_edge("D", "C", ignore_cycles=True) + + remove_cycles(dag, included_edges=set()) + self.assertEqual(len(dag.edges()), 2) + self.assertTrue(dag.has_edge("A", "B") or dag.has_edge("B", "A")) + self.assertTrue(dag.has_edge("C", "D") or dag.has_edge("D", "C")) + + def test_estimate_effect(self): + # Test the estimate_effect function + pass + + def test_evaluate_tests(self): + # Test the evaluate_dag function + pass + + def test_normalize_counts(self): + # Test the normalize_counts function + pass + + def test_evaluate_fitness(self): + # Test the evaluate_fitness function + pass + + def test_evolve_dag(self): + # Test the evolve_dag function + pass + + def tearDown(self): + pass diff --git a/tests/main_tests/test_main.py b/tests/main_tests/test_main.py index 14d92f88..6283cc24 100644 --- a/tests/main_tests/test_main.py +++ b/tests/main_tests/test_main.py @@ -18,6 +18,8 @@ def setUp(self): self.data_paths = ["tests/resources/data/data.csv"] self.test_config_path = "tests/resources/data/tests.json" self.output_path = Path("results/results.json") + self.include_edges_path = Path("tests/resources/data/include_edges.dot") + self.exclude_edges_path = Path("tests/resources/data/exclude_edges.dot") def test_missing_dag(self): with self.assertRaises(FileNotFoundError) as e: @@ -38,6 +40,30 @@ def test_output_file_created(self): self.assertFalse(self.output_path.parent.exists()) CausalTestingPaths(self.dag_path, self.data_paths, self.test_config_path, self.output_path).validate_paths() self.assertTrue(self.output_path.parent.exists()) + + def test_missing_include_edges(self): + with self.assertRaises(FileNotFoundError) as e: + CausalTestingPaths( + self.dag_path, + self.data_paths, + self.test_config_path, + self.output_path, + "missing.dot", + self.exclude_edges_path, + ).validate_paths() + self.assertEqual("Data file not found: missing.dot", str(e.exception)) + + def test_missing_exclude_edges(self): + with self.assertRaises(FileNotFoundError) as e: + CausalTestingPaths( + self.dag_path, + self.data_paths, + self.test_config_path, + self.output_path, + self.include_edges_path, + "missing.dot", + ).validate_paths() + self.assertEqual("Data file not found: missing.dot", str(e.exception)) def tearDown(self): if self.output_path.parent.exists(): @@ -50,11 +76,15 @@ def setUp(self): self.data_paths = ["tests/resources/data/data.csv"] self.test_config_path = "tests/resources/data/tests.json" self.output_path = Path("results/results.json") + self.include_edges_path = "tests/resources/data/include_edges.dot" + self.exclude_edges_path = "tests/resources/data/exclude_edges.dot" self.paths = CausalTestingPaths( dag_path=self.dag_path, data_paths=self.data_paths, test_config_path=self.test_config_path, output_path=self.output_path, + include_edges_path=self.include_edges_path, + exclude_edges_path=self.exclude_edges_path, ) def test_load_data(self): @@ -572,6 +602,32 @@ def test_parse_args_generation_non_default(self): main() self.assertTrue(os.path.exists(os.path.join(tmp, "tests_non_default.json"))) + def test_parse_args_discover(self): + with tempfile.TemporaryDirectory() as tmp: + with patch( + "sys.argv", + [ + "causal_testing", + "discover", + "--data-paths", + str(self.data_paths[0]), + str(self.data_paths[0]), + "--output", + os.path.join(tmp, "discovered_dag.dot"), + "--include-edges", + self.include_edges_path, + "--exclude-edges", + self.exclude_edges_path, + "-s", + "-m", + "200", + "-M", + "15", + ], + ): + main() + self.assertTrue(os.path.exists(os.path.join(tmp, "discovered_dag.dot"))) + def tearDown(self): if self.output_path.parent.exists(): shutil.rmtree(self.output_path.parent) diff --git a/tests/resources/data/exclude_edges.dot b/tests/resources/data/exclude_edges.dot new file mode 100644 index 00000000..7e4de036 --- /dev/null +++ b/tests/resources/data/exclude_edges.dot @@ -0,0 +1,3 @@ +strict digraph "" { + A -> B; +} diff --git a/tests/resources/data/include_edges.dot b/tests/resources/data/include_edges.dot new file mode 100644 index 00000000..7e4de036 --- /dev/null +++ b/tests/resources/data/include_edges.dot @@ -0,0 +1,3 @@ +strict digraph "" { + A -> B; +}