Skip to content

Data Processing

Data processors for variance reduction and data transformation.

BucketProcessor

BucketProcessor

Bases: BaseDataProcessor

Processor that assigns rows to buckets based on a hash of a source column.

This processor creates a new column with bucket assignments by hashing the values in a source column and taking the modulo with the number of buckets. This ensures consistent bucket assignment for the same source values.

Examples:

import pandas as pd
from aboba.processing.bucket_processor import BucketProcessor

# Create sample data
data = pd.DataFrame({
    'user_id': ['user1', 'user2', 'user3', 'user4', 'user5']
})

# Assign users to 3 buckets
processor = BucketProcessor(
    source_column='user_id',
    result_column='bucket',
    n_buckets=3
)
processed_data, _ = processor.transform(data)
print(processed_data)
Source code in aboba/processing/bucket_processor.py
class BucketProcessor(BaseDataProcessor):
    """
    Processor that assigns rows to buckets based on a hash of a source column.

    This processor creates a new column with bucket assignments by hashing the
    values in a source column and taking the modulo with the number of buckets.
    This ensures consistent bucket assignment for the same source values.

    Examples:
        ```python
        import pandas as pd
        from aboba.processing.bucket_processor import BucketProcessor

        # Create sample data
        data = pd.DataFrame({
            'user_id': ['user1', 'user2', 'user3', 'user4', 'user5']
        })

        # Assign users to 3 buckets
        processor = BucketProcessor(
            source_column='user_id',
            result_column='bucket',
            n_buckets=3
        )
        processed_data, _ = processor.transform(data)
        print(processed_data)
        ```
    """

    def __init__(self, source_column: str, result_column: str, n_buckets: int):
        super().__init__()
        self.source_column = source_column
        self.result_column = result_column
        self.n_buckets = n_buckets

    def transform(self, data: pd.DataFrame) -> Tuple[pd.DataFrame, Optional[Dict]]:
        data = data.copy()
        data[self.result_column] = data[self.source_column].map(hash).abs() % self.n_buckets
        return data, None

CupedProcessor

CupedProcessor

Bases: BaseDataProcessor

Performs CUPED (Controlled-experiment Using Pre-Experiment Data) transformation.

CUPED is a variance reduction technique that uses pre-experiment data (covariates) to improve the sensitivity of A/B tests. It adjusts the target metric using information from covariates that are correlated with the target metric.

The transformation fits a linear regression Y = a + b^T X (without a treatment variable) and produces Z = Y - b^T X. Note that the intercept is not subtracted, so the mean of Z equals the intercept, preserving interpretability.

For RelativeIndependentTTest compatibility, saves the original control group mean in artifacts for correct relative effect calculation.

Examples:

import pandas as pd
import numpy as np
from aboba.processing.cuped_processor import CupedProcessor

# Create sample data with pre-experiment data
np.random.seed(42)
n = 1000
# Pre-experiment metric (covariate)
pre_metric = np.random.normal(100, 10, n)
# Post-experiment metric (target) - correlated with pre-metric
post_metric = 2 * pre_metric + np.random.normal(0, 5, n)
# Add treatment effect for second half
post_metric[n//2:] += 5

data = pd.DataFrame({
    'group': [0] * (n//2) + [1] * (n//2),
    'pre_metric': pre_metric,
    'post_metric': post_metric
})

# Apply CUPED transformation (single covariate)
processor = CupedProcessor(
    value_column='post_metric',
    covariate_columns='pre_metric',
    result_column='cuped_metric',
    group_column='group',
    group_control=0,
)
processed_data, artifacts = processor.transform(data)
print(f"Original variance: {data['post_metric'].var():.2f}")
print(f"CUPED variance: {processed_data['cuped_metric'].var():.2f}")
if 'cuped_original_control_mean' in artifacts:
    print(f"Original control mean: {artifacts['cuped_original_control_mean']:.2f}")
Source code in aboba/processing/cuped_processor.py
class CupedProcessor(BaseDataProcessor):
    """
    Performs CUPED (Controlled-experiment Using Pre-Experiment Data) transformation.

    CUPED is a variance reduction technique that uses pre-experiment data (covariates)
    to improve the sensitivity of A/B tests. It adjusts the target metric using
    information from covariates that are correlated with the target metric.

    The transformation fits a linear regression Y = a + b^T X (without a treatment
    variable) and produces Z = Y - b^T X. Note that the intercept is *not* subtracted,
    so the mean of Z equals the intercept, preserving interpretability.

    For RelativeIndependentTTest compatibility, saves the original control group mean
    in artifacts for correct relative effect calculation.

    Examples:
        ```python
        import pandas as pd
        import numpy as np
        from aboba.processing.cuped_processor import CupedProcessor

        # Create sample data with pre-experiment data
        np.random.seed(42)
        n = 1000
        # Pre-experiment metric (covariate)
        pre_metric = np.random.normal(100, 10, n)
        # Post-experiment metric (target) - correlated with pre-metric
        post_metric = 2 * pre_metric + np.random.normal(0, 5, n)
        # Add treatment effect for second half
        post_metric[n//2:] += 5

        data = pd.DataFrame({
            'group': [0] * (n//2) + [1] * (n//2),
            'pre_metric': pre_metric,
            'post_metric': post_metric
        })

        # Apply CUPED transformation (single covariate)
        processor = CupedProcessor(
            value_column='post_metric',
            covariate_columns='pre_metric',
            result_column='cuped_metric',
            group_column='group',
            group_control=0,
        )
        processed_data, artifacts = processor.transform(data)
        print(f"Original variance: {data['post_metric'].var():.2f}")
        print(f"CUPED variance: {processed_data['cuped_metric'].var():.2f}")
        if 'cuped_original_control_mean' in artifacts:
            print(f"Original control mean: {artifacts['cuped_original_control_mean']:.2f}")
        ```
    """

    def __init__(
        self,
        value_column: str,
        covariate_columns: Union[str, List[str]],
        result_column: str,
        group_column: Optional[str] = None,
        group_test: Any = None,
        group_control: Any = None,
    ):
        """
        Initialize the CUPED processor.

        Args:
            value_column (str): Name of the column with the target metric values.
            covariate_columns (Union[str, List[str]]): Name(s) of the column(s) with
                covariate values. Accepts a single column name or a list of names.
            result_column (str): Name of the column to store the CUPED-transformed values.
            group_column (Optional[str]): Column name for group identification. 
                Required for saving original control mean when using with RelativeIndependentTTest.
            group_test (Any): Value in group_column that identifies test group (reserved for future use).
            group_control (Any): Value in group_column that identifies control group.
                Required for saving original control mean when using with RelativeIndependentTTest.
        """

        super().__init__()
        self.value_column = value_column
        if isinstance(covariate_columns, str):
            self.covariate_columns = [covariate_columns]
        else:
            self.covariate_columns = list(covariate_columns)
        self.result_column = result_column
        self.group_column = group_column
        self.group_test = group_test
        self.group_control = group_control

    def transform(self, data: pd.DataFrame) -> Tuple[pd.DataFrame, Dict[str, Any]]:
        """
        Apply CUPED transformation to the data.

        Fits OLS regression Y = a + b^T X on all rows (no treatment variable),
        then computes Z = Y - b^T X (subtracts only the slope contribution).

        Saves the original control group mean in artifacts for proper relative effect calculation
        when group_column and group_control are specified.

        Args:
            data (pd.DataFrame): DataFrame to transform.

        Returns:
            Tuple[pd.DataFrame, Dict[str, Any]]: 
                - Transformed DataFrame with CUPED values
                - Artifacts dict containing:
                  * 'cuped_original_control_mean': Original mean of control group (if group info provided)
                  * 'cuped_value_column': Name of the original value column
        """
        X = data[self.covariate_columns].to_numpy(float)
        Y = data[self.value_column].to_numpy(float)

        # Fit OLS with intercept: Y = a + b^T X
        X_with_intercept = np.column_stack([np.ones(len(X)), X])
        b, _, _, _ = np.linalg.lstsq(X_with_intercept, Y, rcond=None)
        # b[0] = intercept, b[1:] = slopes

        # Z = Y - b^T X  (intercept is not subtracted)
        Z = Y - X @ b[1:]

        data = data.copy()
        data[self.result_column] = Z

        # Save original control group mean for relative tests
        artifacts = {}

        if self.group_column is not None and self.group_control is not None:
            # Check if group column exists in data
            if self.group_column in data.columns:
                control_mask = data[self.group_column] == self.group_control
                if control_mask.any():
                    original_control_mean = Y[control_mask].mean()
                    artifacts['cuped_original_control_mean'] = original_control_mean
                    artifacts['cuped_value_column'] = self.value_column

        return data, artifacts

__init__

__init__(value_column: str, covariate_columns: Union[str, List[str]], result_column: str, group_column: Optional[str] = None, group_test: Any = None, group_control: Any = None)

Initialize the CUPED processor.

PARAMETER DESCRIPTION
value_column

Name of the column with the target metric values.

TYPE: str

covariate_columns

Name(s) of the column(s) with covariate values. Accepts a single column name or a list of names.

TYPE: Union[str, List[str]]

result_column

Name of the column to store the CUPED-transformed values.

TYPE: str

group_column

Column name for group identification. Required for saving original control mean when using with RelativeIndependentTTest.

TYPE: Optional[str] DEFAULT: None

group_test

Value in group_column that identifies test group (reserved for future use).

TYPE: Any DEFAULT: None

group_control

Value in group_column that identifies control group. Required for saving original control mean when using with RelativeIndependentTTest.

TYPE: Any DEFAULT: None

Source code in aboba/processing/cuped_processor.py
def __init__(
    self,
    value_column: str,
    covariate_columns: Union[str, List[str]],
    result_column: str,
    group_column: Optional[str] = None,
    group_test: Any = None,
    group_control: Any = None,
):
    """
    Initialize the CUPED processor.

    Args:
        value_column (str): Name of the column with the target metric values.
        covariate_columns (Union[str, List[str]]): Name(s) of the column(s) with
            covariate values. Accepts a single column name or a list of names.
        result_column (str): Name of the column to store the CUPED-transformed values.
        group_column (Optional[str]): Column name for group identification. 
            Required for saving original control mean when using with RelativeIndependentTTest.
        group_test (Any): Value in group_column that identifies test group (reserved for future use).
        group_control (Any): Value in group_column that identifies control group.
            Required for saving original control mean when using with RelativeIndependentTTest.
    """

    super().__init__()
    self.value_column = value_column
    if isinstance(covariate_columns, str):
        self.covariate_columns = [covariate_columns]
    else:
        self.covariate_columns = list(covariate_columns)
    self.result_column = result_column
    self.group_column = group_column
    self.group_test = group_test
    self.group_control = group_control

transform

transform(data: DataFrame) -> Tuple[pd.DataFrame, Dict[str, Any]]

Apply CUPED transformation to the data.

Fits OLS regression Y = a + b^T X on all rows (no treatment variable), then computes Z = Y - b^T X (subtracts only the slope contribution).

Saves the original control group mean in artifacts for proper relative effect calculation when group_column and group_control are specified.

PARAMETER DESCRIPTION
data

DataFrame to transform.

TYPE: DataFrame

RETURNS DESCRIPTION
Tuple[DataFrame, Dict[str, Any]]

Tuple[pd.DataFrame, Dict[str, Any]]: - Transformed DataFrame with CUPED values - Artifacts dict containing: * 'cuped_original_control_mean': Original mean of control group (if group info provided) * 'cuped_value_column': Name of the original value column

Source code in aboba/processing/cuped_processor.py
def transform(self, data: pd.DataFrame) -> Tuple[pd.DataFrame, Dict[str, Any]]:
    """
    Apply CUPED transformation to the data.

    Fits OLS regression Y = a + b^T X on all rows (no treatment variable),
    then computes Z = Y - b^T X (subtracts only the slope contribution).

    Saves the original control group mean in artifacts for proper relative effect calculation
    when group_column and group_control are specified.

    Args:
        data (pd.DataFrame): DataFrame to transform.

    Returns:
        Tuple[pd.DataFrame, Dict[str, Any]]: 
            - Transformed DataFrame with CUPED values
            - Artifacts dict containing:
              * 'cuped_original_control_mean': Original mean of control group (if group info provided)
              * 'cuped_value_column': Name of the original value column
    """
    X = data[self.covariate_columns].to_numpy(float)
    Y = data[self.value_column].to_numpy(float)

    # Fit OLS with intercept: Y = a + b^T X
    X_with_intercept = np.column_stack([np.ones(len(X)), X])
    b, _, _, _ = np.linalg.lstsq(X_with_intercept, Y, rcond=None)
    # b[0] = intercept, b[1:] = slopes

    # Z = Y - b^T X  (intercept is not subtracted)
    Z = Y - X @ b[1:]

    data = data.copy()
    data[self.result_column] = Z

    # Save original control group mean for relative tests
    artifacts = {}

    if self.group_column is not None and self.group_control is not None:
        # Check if group column exists in data
        if self.group_column in data.columns:
            control_mask = data[self.group_column] == self.group_control
            if control_mask.any():
                original_control_mean = Y[control_mask].mean()
                artifacts['cuped_original_control_mean'] = original_control_mean
                artifacts['cuped_value_column'] = self.value_column

    return data, artifacts

RegressionProcessor

RegressionProcessor

Bases: BaseDataProcessor

Performs regression-based transformation for CUPED or other adjustments.

This processor uses linear regression to adjust target metrics based on covariates. It can be used for CUPED transformation or other regression-based adjustments.

Examples:

import pandas as pd
import numpy as np
from aboba.processing.regression_processor import RegressionProcessor

# Create sample data
np.random.seed(42)
n = 1000
covariate1 = np.random.normal(0, 1, n)
covariate2 = np.random.normal(0, 1, n)
# Target variable with some relationship to covariates
target = 2 * covariate1 + 1.5 * covariate2 + np.random.normal(0, 0.5, n)
# Add group column
group = ['control'] * (n//2) + ['test'] * (n//2)
# Add treatment effect
target[n//2:] += 2

data = pd.DataFrame({
    'target': target,
    'covariate1': covariate1,
    'covariate2': covariate2,
    'group': group
})

# Apply regression adjustment
processor = RegressionProcessor(
    value_column='target',
    covariate_columns=['covariate1', 'covariate2'],
    result_column='adjusted_target',
    group_column='group',
    group_test='test',
    group_control='control'
)
processed_data, _ = processor.transform(data)
print(f"Original mean: {data['target'].mean():.2f}")
print(f"Adjusted mean: {processed_data['adjusted_target'].mean():.2f}")
Source code in aboba/processing/regression_processor.py
class RegressionProcessor(BaseDataProcessor):
    """
    Performs regression-based transformation for CUPED or other adjustments.

    This processor uses linear regression to adjust target metrics based on
    covariates. It can be used for CUPED transformation or other regression-based
    adjustments.

    Examples:
        ```python
        import pandas as pd
        import numpy as np
        from aboba.processing.regression_processor import RegressionProcessor

        # Create sample data
        np.random.seed(42)
        n = 1000
        covariate1 = np.random.normal(0, 1, n)
        covariate2 = np.random.normal(0, 1, n)
        # Target variable with some relationship to covariates
        target = 2 * covariate1 + 1.5 * covariate2 + np.random.normal(0, 0.5, n)
        # Add group column
        group = ['control'] * (n//2) + ['test'] * (n//2)
        # Add treatment effect
        target[n//2:] += 2

        data = pd.DataFrame({
            'target': target,
            'covariate1': covariate1,
            'covariate2': covariate2,
            'group': group
        })

        # Apply regression adjustment
        processor = RegressionProcessor(
            value_column='target',
            covariate_columns=['covariate1', 'covariate2'],
            result_column='adjusted_target',
            group_column='group',
            group_test='test',
            group_control='control'
        )
        processed_data, _ = processor.transform(data)
        print(f"Original mean: {data['target'].mean():.2f}")
        print(f"Adjusted mean: {processed_data['adjusted_target'].mean():.2f}")
        ```
    """

    def __init__(
        self,
        value_column: str,
        covariate_columns: List[str],
        result_column: str,
        group_column: Optional[str] = None,
        group_test: Any = None,
        group_control: Any = None,
    ):
        """
        Initialize the RegressionProcessor.

        Args:
            value_column (str): Name of the column with the target values.
            covariate_columns (List[str]): List of column names with covariate values.
            result_column (str): Name of the column to store the adjusted values.
            group_column (Optional[str]): Name of the column with group labels.
                If None, all data is used for regression.
            group_test (Any): Value in group_column that identifies the test group.
            group_control (Any): Value in group_column that identifies the control group.
        """

        raise NotImplementedError("Because of infixed bugs")

        super().__init__()
        self.value_column = value_column
        self.covariate_columns = covariate_columns
        self.result_column = result_column

        if group_column is not None:
            assert group_test is not None
            assert group_control is not None

        self.group_column = group_column
        self.group_test = group_test
        self.group_control = group_control

    def transform(self, data: pd.DataFrame):
        """
        Apply regression transformation to the data.

        Args:
            data (pd.DataFrame): DataFrame to transform.

        Returns:
            Tuple[pd.DataFrame, Dict]: Transformed DataFrame and empty artifacts dict.
        """
        # This implementation needs to be fixed - the current code has undefined variables
        # and doesn't match the class purpose. A proper implementation would:
        # 1. Fit a regression model on control group data
        # 2. Use the model to predict values for all data
        # 3. Adjust the target values based on the predictions
        #
        # For now, we'll return the data unchanged with a warning
        import warnings
        warnings.warn("RegressionProcessor.transform() is not properly implemented")
        return data, dict()

__init__

__init__(value_column: str, covariate_columns: List[str], result_column: str, group_column: Optional[str] = None, group_test: Any = None, group_control: Any = None)

Initialize the RegressionProcessor.

PARAMETER DESCRIPTION
value_column

Name of the column with the target values.

TYPE: str

covariate_columns

List of column names with covariate values.

TYPE: List[str]

result_column

Name of the column to store the adjusted values.

TYPE: str

group_column

Name of the column with group labels. If None, all data is used for regression.

TYPE: Optional[str] DEFAULT: None

group_test

Value in group_column that identifies the test group.

TYPE: Any DEFAULT: None

group_control

Value in group_column that identifies the control group.

TYPE: Any DEFAULT: None

Source code in aboba/processing/regression_processor.py
def __init__(
    self,
    value_column: str,
    covariate_columns: List[str],
    result_column: str,
    group_column: Optional[str] = None,
    group_test: Any = None,
    group_control: Any = None,
):
    """
    Initialize the RegressionProcessor.

    Args:
        value_column (str): Name of the column with the target values.
        covariate_columns (List[str]): List of column names with covariate values.
        result_column (str): Name of the column to store the adjusted values.
        group_column (Optional[str]): Name of the column with group labels.
            If None, all data is used for regression.
        group_test (Any): Value in group_column that identifies the test group.
        group_control (Any): Value in group_column that identifies the control group.
    """

    raise NotImplementedError("Because of infixed bugs")

    super().__init__()
    self.value_column = value_column
    self.covariate_columns = covariate_columns
    self.result_column = result_column

    if group_column is not None:
        assert group_test is not None
        assert group_control is not None

    self.group_column = group_column
    self.group_test = group_test
    self.group_control = group_control

transform

transform(data: DataFrame)

Apply regression transformation to the data.

PARAMETER DESCRIPTION
data

DataFrame to transform.

TYPE: DataFrame

RETURNS DESCRIPTION

Tuple[pd.DataFrame, Dict]: Transformed DataFrame and empty artifacts dict.

Source code in aboba/processing/regression_processor.py
def transform(self, data: pd.DataFrame):
    """
    Apply regression transformation to the data.

    Args:
        data (pd.DataFrame): DataFrame to transform.

    Returns:
        Tuple[pd.DataFrame, Dict]: Transformed DataFrame and empty artifacts dict.
    """
    # This implementation needs to be fixed - the current code has undefined variables
    # and doesn't match the class purpose. A proper implementation would:
    # 1. Fit a regression model on control group data
    # 2. Use the model to predict values for all data
    # 3. Adjust the target values based on the predictions
    #
    # For now, we'll return the data unchanged with a warning
    import warnings
    warnings.warn("RegressionProcessor.transform() is not properly implemented")
    return data, dict()

RenameColsProcessor

RenameColsProcessor

Bases: BaseDataProcessor

Renames columns in a DataFrame according to a mapping dictionary.

This processor takes a dictionary that maps old column names to new column names and applies the renaming to the DataFrame.

Examples:

import pandas as pd
from aboba.processing.rename_cols_processor import RenameColsProcessor

# Create sample data
data = pd.DataFrame({
    'old_name1': [1, 2, 3],
    'old_name2': ['a', 'b', 'c']
})

# Rename columns
processor = RenameColsProcessor({
    'old_name1': 'new_name1',
    'old_name2': 'new_name2'
})
processed_data, _ = processor.transform(data)
print(processed_data.columns.tolist())
Source code in aboba/processing/rename_cols_processor.py
class RenameColsProcessor(BaseDataProcessor):
    """
    Renames columns in a DataFrame according to a mapping dictionary.

    This processor takes a dictionary that maps old column names to new column
    names and applies the renaming to the DataFrame.

    Examples:
        ```python
        import pandas as pd
        from aboba.processing.rename_cols_processor import RenameColsProcessor

        # Create sample data
        data = pd.DataFrame({
            'old_name1': [1, 2, 3],
            'old_name2': ['a', 'b', 'c']
        })

        # Rename columns
        processor = RenameColsProcessor({
            'old_name1': 'new_name1',
            'old_name2': 'new_name2'
        })
        processed_data, _ = processor.transform(data)
        print(processed_data.columns.tolist())
        ```
    """

    def __init__(self, names_mapping: Dict[str, str]):
        super().__init__()
        self.names_mapping = names_mapping

    def transform(self, data: pd.DataFrame):
        data = data.rename(columns=self.names_mapping)
        return data, dict()

EnsureColsProcessor

EnsureColsProcessor

Bases: BaseDataProcessor

Verifies that specified columns are present in dataframe.

This processor checks that all required columns are present in the DataFrame and raises an assertion error if any are missing. It's useful for validating data before processing.

Examples:

import pandas as pd
from aboba.processing.ensure_cols_processor import EnsureColsProcessor

# Create sample data
data = pd.DataFrame({
    'col1': [1, 2, 3],
    'col2': ['a', 'b', 'c']
})

# Verify required columns are present
processor = EnsureColsProcessor(['col1', 'col2'])
processed_data, _ = processor.transform(data)  # No error
print("Columns verified successfully")

# This would raise an AssertionError:
# processor = EnsureColsProcessor(['col1', 'missing_col'])
# processor.transform(data)
Source code in aboba/processing/ensure_cols_processor.py
class EnsureColsProcessor(BaseDataProcessor):
    """
    Verifies that specified columns are present in dataframe.

    This processor checks that all required columns are present in the DataFrame
    and raises an assertion error if any are missing. It's useful for validating
    data before processing.

    Examples:
        ```python
        import pandas as pd
        from aboba.processing.ensure_cols_processor import EnsureColsProcessor

        # Create sample data
        data = pd.DataFrame({
            'col1': [1, 2, 3],
            'col2': ['a', 'b', 'c']
        })

        # Verify required columns are present
        processor = EnsureColsProcessor(['col1', 'col2'])
        processed_data, _ = processor.transform(data)  # No error
        print("Columns verified successfully")

        # This would raise an AssertionError:
        # processor = EnsureColsProcessor(['col1', 'missing_col'])
        # processor.transform(data)
        ```
    """

    def __init__(self, cols: tp.List[str]) -> None:
        super().__init__()
        self.cols = cols

    def transform(self, data: pd.DataFrame):
        """
        Verify that all required columns are present in the data.

        Args:
            data (pd.DataFrame): DataFrame to verify.

        Returns:
            Tuple[pd.DataFrame, Dict]: Original DataFrame and empty artifacts dict.

        Raises:
            AssertionError: If any required column is missing from the DataFrame.
        """
        for col in self.cols:
            assert (
                col in data.columns
            ), f"Expected column '{col}' in dataframe, got only {data.columns}"
        return data, dict()

transform

transform(data: DataFrame)

Verify that all required columns are present in the data.

PARAMETER DESCRIPTION
data

DataFrame to verify.

TYPE: DataFrame

RETURNS DESCRIPTION

Tuple[pd.DataFrame, Dict]: Original DataFrame and empty artifacts dict.

RAISES DESCRIPTION
AssertionError

If any required column is missing from the DataFrame.

Source code in aboba/processing/ensure_cols_processor.py
def transform(self, data: pd.DataFrame):
    """
    Verify that all required columns are present in the data.

    Args:
        data (pd.DataFrame): DataFrame to verify.

    Returns:
        Tuple[pd.DataFrame, Dict]: Original DataFrame and empty artifacts dict.

    Raises:
        AssertionError: If any required column is missing from the DataFrame.
    """
    for col in self.cols:
        assert (
            col in data.columns
        ), f"Expected column '{col}' in dataframe, got only {data.columns}"
    return data, dict()

PreprocessorPipeline

PreprocessorPipeline

Bases: BaseDataProcessor

Processes data through a sequence of preprocessing steps.

This pipeline applies a series of data processors in order, passing the output of each step as input to the next. It supports both fitting (for processors that need to learn from data) and transforming.

Examples:

import pandas as pd
from aboba.processing.preprocessor_pipeline import PreprocessorPipeline
from aboba.processing.rename_cols_processor import RenameColsProcessor
from aboba.processing.ensure_cols_processor import EnsureColsProcessor

# Create sample data
data = pd.DataFrame({
    'old_name': [1, 2, 3, 4, 5],
    'other_col': ['a', 'b', 'c', 'd', 'e']
})

# Create a pipeline with two steps
pipeline = PreprocessorPipeline([
    RenameColsProcessor({'old_name': 'new_name'}),
    EnsureColsProcessor(['new_name', 'other_col'])
], verbose=True)

# Fit and transform the data
processed_data, artifacts = pipeline.fit_transform(data)
print(processed_data.columns.tolist())
Source code in aboba/processing/preprocessor_pipeline.py
class PreprocessorPipeline(BaseDataProcessor):
    """
    Processes data through a sequence of preprocessing steps.

    This pipeline applies a series of data processors in order, passing the
    output of each step as input to the next. It supports both fitting
    (for processors that need to learn from data) and transforming.

    Examples:
        ```python
        import pandas as pd
        from aboba.processing.preprocessor_pipeline import PreprocessorPipeline
        from aboba.processing.rename_cols_processor import RenameColsProcessor
        from aboba.processing.ensure_cols_processor import EnsureColsProcessor

        # Create sample data
        data = pd.DataFrame({
            'old_name': [1, 2, 3, 4, 5],
            'other_col': ['a', 'b', 'c', 'd', 'e']
        })

        # Create a pipeline with two steps
        pipeline = PreprocessorPipeline([
            RenameColsProcessor({'old_name': 'new_name'}),
            EnsureColsProcessor(['new_name', 'other_col'])
        ], verbose=True)

        # Fit and transform the data
        processed_data, artifacts = pipeline.fit_transform(data)
        print(processed_data.columns.tolist())
        ```
    """

    def __init__(self, steps: List[DataProcessor], verbose: bool = False, **kwargs):
        self.steps = steps
        self.verbose = verbose
        self.tqdm_kwargs = kwargs


    def fit(self, data: pd.DataFrame):
        """
        This called once on **all** available data, each processor step
        receives processed data from the previous step

        Args:
            data (pd.DataFrame): full data, with all groups
        """
        self._transform(data, fit=True)
        return self

    def transform(self, data: pd.DataFrame) -> Tuple[pd.DataFrame, Dict]:
        """

        Transforms data in steps.
        Fit must be called before transforming

        Args:
            data (pd.DataFrame): data to fit on and to transform

        Returns:
            Tuple[pd.DataFrame, Dict]: tuple with processed row and artefacts dict
        """
        return self._transform(data, fit=False)

    def fit_transform(self, data: pd.DataFrame) -> Tuple[pd.DataFrame, Dict]:
        """

        Combination of fit and transform

        Args:
            data (pd.DataFrame): data to fit on and to transform

        Returns:
            Tuple[pd.DataFrame, Dict]: tuple with processed row and artefacts dict
        """
        return self._transform(data, fit=True)

    def _transform(self, data: pd.DataFrame, fit=False) -> Tuple[pd.DataFrame, Dict]:
        artifacts = dict()

        itr = tqdm(self.steps, **self.tqdm_kwargs)

        for step in itr:
            if self.verbose:
                itr.set_description("processing pipeline")
            data, artifacts_new = self._process_step(step, data, fit=fit)
            self._merge_artifacts(artifacts, artifacts_new)
        return data, artifacts

    @staticmethod
    def _process_step(
        step: DataProcessor, data: pd.DataFrame, fit: bool
    ) -> Tuple[pd.DataFrame, Dict]:
        """
        Process a single step in the pipeline.

        Args:
            step (DataProcessor): The processor to apply.
            data (pd.DataFrame): Data to process.
            fit (bool): Whether to fit the processor before transforming.

        Returns:
            Tuple[pd.DataFrame, Dict]: Processed data and artifacts.
        """
        if isinstance(step, BaseDataProcessor):
            if fit:
                step.fit(data)
            data, artifacts_new = step.transform(data)
        elif hasattr(step, "__call__"):
            data, artifacts_new = step(data)
        else:
            assert False, "Data processor must be of DataProcessor type"
        return data, artifacts_new

    @staticmethod
    def _merge_artifacts(artifacts, artifacts_new):
        """
        Merge artifacts from a new step into the existing artifacts.

        Args:
            artifacts (Dict): Existing artifacts.
            artifacts_new (Dict): New artifacts to merge.
        """
        for key in artifacts_new:
            if key in artifacts:
                logger.warning(
                    f"Overwriting existing artifact {key} in processing step!"
                )
            artifacts[key] = artifacts_new[key]

fit

fit(data: DataFrame)

This called once on all available data, each processor step receives processed data from the previous step

PARAMETER DESCRIPTION
data

full data, with all groups

TYPE: DataFrame

Source code in aboba/processing/preprocessor_pipeline.py
def fit(self, data: pd.DataFrame):
    """
    This called once on **all** available data, each processor step
    receives processed data from the previous step

    Args:
        data (pd.DataFrame): full data, with all groups
    """
    self._transform(data, fit=True)
    return self

transform

transform(data: DataFrame) -> Tuple[pd.DataFrame, Dict]

Transforms data in steps. Fit must be called before transforming

PARAMETER DESCRIPTION
data

data to fit on and to transform

TYPE: DataFrame

RETURNS DESCRIPTION
Tuple[DataFrame, Dict]

Tuple[pd.DataFrame, Dict]: tuple with processed row and artefacts dict

Source code in aboba/processing/preprocessor_pipeline.py
def transform(self, data: pd.DataFrame) -> Tuple[pd.DataFrame, Dict]:
    """

    Transforms data in steps.
    Fit must be called before transforming

    Args:
        data (pd.DataFrame): data to fit on and to transform

    Returns:
        Tuple[pd.DataFrame, Dict]: tuple with processed row and artefacts dict
    """
    return self._transform(data, fit=False)

fit_transform

fit_transform(data: DataFrame) -> Tuple[pd.DataFrame, Dict]

Combination of fit and transform

PARAMETER DESCRIPTION
data

data to fit on and to transform

TYPE: DataFrame

RETURNS DESCRIPTION
Tuple[DataFrame, Dict]

Tuple[pd.DataFrame, Dict]: tuple with processed row and artefacts dict

Source code in aboba/processing/preprocessor_pipeline.py
def fit_transform(self, data: pd.DataFrame) -> Tuple[pd.DataFrame, Dict]:
    """

    Combination of fit and transform

    Args:
        data (pd.DataFrame): data to fit on and to transform

    Returns:
        Tuple[pd.DataFrame, Dict]: tuple with processed row and artefacts dict
    """
    return self._transform(data, fit=True)