Skip to content

Pipeline

Pipeline for chaining multiple operations.

Pipeline

Pipeline

Bases: AbobaBase

A sequence of data preprocessors with one optional splitter.

Pipeline allows you to sequentially apply preprocessors to your data. If splitter is given then, when its turn comes, the groups are sampled from the data and the following preprocessors will be applied to each group produced by the splitter.

The pipeline accumulates artifacts from all processors and returns them along with the transformed data. Supports both old-style processors (returning only data or (data,)) and new-style (returning (data, artifacts)).

Examples:

from aboba import tests
from aboba import splitters
from aboba import effect_modifiers
from aboba import processing
from aboba import pipelines

import numpy as np
import pandas as pd
import scipy.stats as sps

data_a = sps.norm.rvs(size=1000, loc=0, scale=100)
data_b = sps.norm.rvs(size=1000, loc=0, scale=100)

data_a_cov = data_a + sps.norm.rvs(size=1000, loc=0, scale=0.5)
data_b_cov = data_b + sps.norm.rvs(size=1000, loc=0, scale=0.5)

data_a_strat = sps.bernoulli.rvs(p=0.15, size=1000)
data_b_strat = sps.bernoulli.rvs(p=0.15, size=1000)

# dataset of two columns: value and group
data = pd.DataFrame({
    'value': np.concatenate([
        data_a,
        data_b,
    ]),
    'covariate': np.concatenate([
        data_a_cov,
        data_b_cov,
    ]),
    'strat': np.concatenate([
        data_a_strat,
        data_b_strat,
    ]),
    'b_group': np.concatenate([
        np.repeat(0, 1000),
        np.repeat(1, 1000),
    ]),
})

cuped_preprocess = processing.CupedProcessor(
    value_column='value',
    covariate_columns='covariate',
    result_column='value',
    group_column='b_group',
    group_test=1,
    group_control=0,
)

ensurecol_preprocess = processing.EnsureColsProcessor(cols=['value', 'covariate'])

pipeline_cuped = pipelines.Pipeline([
    ('cuped', cuped_preprocess),
    ('splitter', splitter),
    ('ensurecols', ensurecol_preprocess)
])

pipeline_cuped.fit(data)

result_data, artifacts = pipeline_cuped.transform(data)

# Or if you don't need artifacts:
result_data, _ = pipeline_cuped.transform(data)

# Check for CUPED artifacts
if 'cuped_original_control_mean' in artifacts:
    print(f"Original control mean: {artifacts['cuped_original_control_mean']}")
Source code in aboba/pipeline/pipeline.py
class Pipeline(AbobaBase):
    """
    A sequence of data preprocessors with one optional splitter.

    Pipeline allows you to sequentially apply preprocessors to your data. If splitter is given then, 
    when its turn comes, the groups are sampled from the data and the following preprocessors will be 
    applied to each group produced by the splitter.

    The pipeline accumulates artifacts from all processors and returns them along with the transformed data.
    Supports both old-style processors (returning only data or (data,)) and new-style (returning (data, artifacts)).

    Examples:

    ```python
    from aboba import tests
    from aboba import splitters
    from aboba import effect_modifiers
    from aboba import processing
    from aboba import pipelines

    import numpy as np
    import pandas as pd
    import scipy.stats as sps

    data_a = sps.norm.rvs(size=1000, loc=0, scale=100)
    data_b = sps.norm.rvs(size=1000, loc=0, scale=100)

    data_a_cov = data_a + sps.norm.rvs(size=1000, loc=0, scale=0.5)
    data_b_cov = data_b + sps.norm.rvs(size=1000, loc=0, scale=0.5)

    data_a_strat = sps.bernoulli.rvs(p=0.15, size=1000)
    data_b_strat = sps.bernoulli.rvs(p=0.15, size=1000)

    # dataset of two columns: value and group
    data = pd.DataFrame({
        'value': np.concatenate([
            data_a,
            data_b,
        ]),
        'covariate': np.concatenate([
            data_a_cov,
            data_b_cov,
        ]),
        'strat': np.concatenate([
            data_a_strat,
            data_b_strat,
        ]),
        'b_group': np.concatenate([
            np.repeat(0, 1000),
            np.repeat(1, 1000),
        ]),
    })

    cuped_preprocess = processing.CupedProcessor(
        value_column='value',
        covariate_columns='covariate',
        result_column='value',
        group_column='b_group',
        group_test=1,
        group_control=0,
    )

    ensurecol_preprocess = processing.EnsureColsProcessor(cols=['value', 'covariate'])

    pipeline_cuped = pipelines.Pipeline([
        ('cuped', cuped_preprocess),
        ('splitter', splitter),
        ('ensurecols', ensurecol_preprocess)
    ])

    pipeline_cuped.fit(data)

    result_data, artifacts = pipeline_cuped.transform(data)

    # Or if you don't need artifacts:
    result_data, _ = pipeline_cuped.transform(data)

    # Check for CUPED artifacts
    if 'cuped_original_control_mean' in artifacts:
        print(f"Original control mean: {artifacts['cuped_original_control_mean']}")
    ```
    """

    def __init__(self, steps: List[Union[Tuple[str, Union[BaseDataSplitter, BaseDataProcessor]], 
                                         BaseDataSplitter, 
                                         BaseDataProcessor]]):
        """
        Initialize the pipeline with a list of transformers.

        Args:
            steps: List of transformers. Each step can be:
                - A tuple (name, transformer)
                - Just a transformer (name will be auto-generated)
        """
        self.names = [] 
        self.transformers = []

        for i in range(len(steps)):
            step = steps[i]
            if isinstance(step, tuple):
                if len(step) > 1:
                    step_transformer = step[1]
                    step_name = step[0]
                else:
                    step_transformer = step[0]
                    step_name = type(step_transformer).__name__

                self.transformers.append(step_transformer)
                self.names.append(step_name)
            else:
                self.transformers.append(step)
                self.names.append(type(step).__name__)

            if not isinstance(self.transformers[-1], BaseDataSplitter) and not isinstance(self.transformers[-1], BaseDataProcessor):
                raise TypeError("Object must be Splitter or Processor to be in pipeline")


    @staticmethod
    def _resolve_datasource(data):
        """
        Resolve a DataSource to a concrete pd.DataFrame or list of DataFrames.

        Supports pd.DataFrame, List[pd.DataFrame], and any callable (including
        BaseDataSource instances) that returns a pd.DataFrame.
        """
        if callable(data):
            return data()
        return data

    @staticmethod
    def _merge_artifacts(target: Dict[str, Any], source: Optional[Dict[str, Any]]) -> None:
        """
        Merge artifacts from source into target dictionary.

        Handles None and non-dict sources gracefully.

        Args:
            target: Target dictionary to update (modified in-place)
            source: Source dictionary to merge from (can be None or non-dict)
        """
        if source is not None and isinstance(source, dict):
            target.update(source)

    def fit(self, data):
        data = Pipeline._resolve_datasource(data)

        for transformer in self.transformers:
            transformer.fit(data)

            if isinstance(transformer, BaseDataProcessor):
                if isinstance(data, pd.DataFrame):
                    transform_result = transformer.transform(data)

                    if isinstance(transform_result, tuple):
                        if len(transform_result) >= 1:
                            data = transform_result[0]
                    else:
                        data = transform_result

                elif isinstance(data, list):
                    from aboba.processing.linearization_processor import LinearizationProcessor

                    if isinstance(transformer, LinearizationProcessor):
                        transform_result = transformer.transform(data)
                        if isinstance(transform_result, tuple):
                            if len(transform_result) >= 1:
                                data = transform_result[0]
                        else:
                            data = transform_result
                    else:
                        processed = []
                        for group in data:
                            transform_result = transformer.transform(group)

                            if isinstance(transform_result, tuple):
                                if len(transform_result) >= 1:
                                    processed.append(transform_result[0])
                            else:
                                processed.append(transform_result)

                        data = processed

            elif isinstance(transformer, BaseDataSplitter):
                if not isinstance(data, pd.DataFrame):
                    raise TypeError("Only one Splitter is allowed in pipeline")

                data = transformer.sample(data, None)
            else:
                raise TypeError("Object must be Splitter or Processor to be in pipeline")

        return self

    def transform(self, data) -> Tuple[Union[pd.DataFrame, List[pd.DataFrame]], Dict[str, Any]]:
        """
        Apply all transformers in the pipeline to the provided data.

        Accumulates artifacts from all processors and returns them along with transformed data.
        Supports both old-style processors (returning only data) and new-style (returning (data, artifacts)).

        Args:
            data: Input data (pd.DataFrame, List[pd.DataFrame], or callable returning such)

        Returns:
            Tuple[Union[pd.DataFrame, List[pd.DataFrame]], Dict[str, Any]]: 
                - Transformed data (single DataFrame or list of DataFrames after splitting)
                - Accumulated artifacts dictionary from all processors

        Examples:
            ```python
            # Get both data and artifacts
            transformed_data, artifacts = pipeline.transform(data)

            # If you don't need artifacts
            transformed_data, _ = pipeline.transform(data)

            # Check for CUPED artifacts
            if 'cuped_original_control_mean' in artifacts:
                print(f"Original control mean: {artifacts['cuped_original_control_mean']}")
            ```
        """
        data = Pipeline._resolve_datasource(data)

        # Initialize accumulated artifacts dictionary
        accumulated_artefacts = {}

        for transformer in self.transformers:
            if isinstance(transformer, BaseDataProcessor):
                if isinstance(data, pd.DataFrame):
                    # Transform single DataFrame
                    transform_result = transformer.transform(data)

                    if isinstance(transform_result, tuple):
                        if len(transform_result) == 2:
                            data, new_artefacts = transform_result
                        elif len(transform_result) == 1:
                            data = transform_result[0]
                            new_artefacts = None
                        else:
                            data = transform_result[0]
                            new_artefacts = None
                    else:
                        data = transform_result
                        new_artefacts = None

                    # Accumulate artifacts
                    Pipeline._merge_artifacts(accumulated_artefacts, new_artefacts)

                elif isinstance(data, list):
                    from aboba.processing.linearization_processor import LinearizationProcessor

                    if isinstance(transformer, LinearizationProcessor):
                        transform_result = transformer.transform(data)
                        new_artefacts = None
                        if isinstance(transform_result, tuple):
                            if len(transform_result) == 2:
                                data, new_artefacts = transform_result
                            elif len(transform_result) == 1:
                                data = transform_result[0]
                            else:
                                data = transform_result[0]
                        else:
                            data = transform_result
                        Pipeline._merge_artifacts(accumulated_artefacts, new_artefacts)
                    else:
                        processed = []
                        for group in data:
                            transform_result = transformer.transform(group)

                            if isinstance(transform_result, tuple):
                                if len(transform_result) == 2:
                                    processed.append(transform_result)
                                elif len(transform_result) == 1:
                                    processed.append((transform_result[0], None))
                                else:
                                    processed.append((transform_result[0], None))
                            else:
                                processed.append((transform_result, None))

                        data = [group[0] for group in processed]

                        if processed and len(processed) > 0:
                            new_artefacts = processed[0][1]
                            Pipeline._merge_artifacts(accumulated_artefacts, new_artefacts)

                else:
                    raise TypeError("Data must be pd.DataFrame or List[pd.DataFrame] to be handled by Pipeline")

            elif isinstance(transformer, BaseDataSplitter):
                if not isinstance(data, pd.DataFrame):
                    raise TypeError("Only one Splitter is allowed in pipeline")

                data = transformer.sample(data, accumulated_artefacts) 

        return data, accumulated_artefacts


    def fit_transform(self, data) -> Tuple[Union[pd.DataFrame, List[pd.DataFrame]], Dict[str, Any]]:
        """
        Fit the pipeline and transform data in one step.

        Args:
            data: Input data

        Returns:
            Tuple[Union[pd.DataFrame, List[pd.DataFrame]], Dict[str, Any]]: 
                Transformed data and accumulated artifacts
        """
        self.fit(data)
        return self.transform(data)

__init__

__init__(steps: List[Union[Tuple[str, Union[BaseDataSplitter, BaseDataProcessor]], BaseDataSplitter, BaseDataProcessor]])

Initialize the pipeline with a list of transformers.

PARAMETER DESCRIPTION
steps

List of transformers. Each step can be: - A tuple (name, transformer) - Just a transformer (name will be auto-generated)

TYPE: List[Union[Tuple[str, Union[BaseDataSplitter, BaseDataProcessor]], BaseDataSplitter, BaseDataProcessor]]

Source code in aboba/pipeline/pipeline.py
def __init__(self, steps: List[Union[Tuple[str, Union[BaseDataSplitter, BaseDataProcessor]], 
                                     BaseDataSplitter, 
                                     BaseDataProcessor]]):
    """
    Initialize the pipeline with a list of transformers.

    Args:
        steps: List of transformers. Each step can be:
            - A tuple (name, transformer)
            - Just a transformer (name will be auto-generated)
    """
    self.names = [] 
    self.transformers = []

    for i in range(len(steps)):
        step = steps[i]
        if isinstance(step, tuple):
            if len(step) > 1:
                step_transformer = step[1]
                step_name = step[0]
            else:
                step_transformer = step[0]
                step_name = type(step_transformer).__name__

            self.transformers.append(step_transformer)
            self.names.append(step_name)
        else:
            self.transformers.append(step)
            self.names.append(type(step).__name__)

        if not isinstance(self.transformers[-1], BaseDataSplitter) and not isinstance(self.transformers[-1], BaseDataProcessor):
            raise TypeError("Object must be Splitter or Processor to be in pipeline")

transform

transform(data) -> Tuple[Union[pd.DataFrame, List[pd.DataFrame]], Dict[str, Any]]

Apply all transformers in the pipeline to the provided data.

Accumulates artifacts from all processors and returns them along with transformed data. Supports both old-style processors (returning only data) and new-style (returning (data, artifacts)).

PARAMETER DESCRIPTION
data

Input data (pd.DataFrame, List[pd.DataFrame], or callable returning such)

RETURNS DESCRIPTION
Tuple[Union[DataFrame, List[DataFrame]], Dict[str, Any]]

Tuple[Union[pd.DataFrame, List[pd.DataFrame]], Dict[str, Any]]: - Transformed data (single DataFrame or list of DataFrames after splitting) - Accumulated artifacts dictionary from all processors

Examples:

# Get both data and artifacts
transformed_data, artifacts = pipeline.transform(data)

# If you don't need artifacts
transformed_data, _ = pipeline.transform(data)

# Check for CUPED artifacts
if 'cuped_original_control_mean' in artifacts:
    print(f"Original control mean: {artifacts['cuped_original_control_mean']}")
Source code in aboba/pipeline/pipeline.py
def transform(self, data) -> Tuple[Union[pd.DataFrame, List[pd.DataFrame]], Dict[str, Any]]:
    """
    Apply all transformers in the pipeline to the provided data.

    Accumulates artifacts from all processors and returns them along with transformed data.
    Supports both old-style processors (returning only data) and new-style (returning (data, artifacts)).

    Args:
        data: Input data (pd.DataFrame, List[pd.DataFrame], or callable returning such)

    Returns:
        Tuple[Union[pd.DataFrame, List[pd.DataFrame]], Dict[str, Any]]: 
            - Transformed data (single DataFrame or list of DataFrames after splitting)
            - Accumulated artifacts dictionary from all processors

    Examples:
        ```python
        # Get both data and artifacts
        transformed_data, artifacts = pipeline.transform(data)

        # If you don't need artifacts
        transformed_data, _ = pipeline.transform(data)

        # Check for CUPED artifacts
        if 'cuped_original_control_mean' in artifacts:
            print(f"Original control mean: {artifacts['cuped_original_control_mean']}")
        ```
    """
    data = Pipeline._resolve_datasource(data)

    # Initialize accumulated artifacts dictionary
    accumulated_artefacts = {}

    for transformer in self.transformers:
        if isinstance(transformer, BaseDataProcessor):
            if isinstance(data, pd.DataFrame):
                # Transform single DataFrame
                transform_result = transformer.transform(data)

                if isinstance(transform_result, tuple):
                    if len(transform_result) == 2:
                        data, new_artefacts = transform_result
                    elif len(transform_result) == 1:
                        data = transform_result[0]
                        new_artefacts = None
                    else:
                        data = transform_result[0]
                        new_artefacts = None
                else:
                    data = transform_result
                    new_artefacts = None

                # Accumulate artifacts
                Pipeline._merge_artifacts(accumulated_artefacts, new_artefacts)

            elif isinstance(data, list):
                from aboba.processing.linearization_processor import LinearizationProcessor

                if isinstance(transformer, LinearizationProcessor):
                    transform_result = transformer.transform(data)
                    new_artefacts = None
                    if isinstance(transform_result, tuple):
                        if len(transform_result) == 2:
                            data, new_artefacts = transform_result
                        elif len(transform_result) == 1:
                            data = transform_result[0]
                        else:
                            data = transform_result[0]
                    else:
                        data = transform_result
                    Pipeline._merge_artifacts(accumulated_artefacts, new_artefacts)
                else:
                    processed = []
                    for group in data:
                        transform_result = transformer.transform(group)

                        if isinstance(transform_result, tuple):
                            if len(transform_result) == 2:
                                processed.append(transform_result)
                            elif len(transform_result) == 1:
                                processed.append((transform_result[0], None))
                            else:
                                processed.append((transform_result[0], None))
                        else:
                            processed.append((transform_result, None))

                    data = [group[0] for group in processed]

                    if processed and len(processed) > 0:
                        new_artefacts = processed[0][1]
                        Pipeline._merge_artifacts(accumulated_artefacts, new_artefacts)

            else:
                raise TypeError("Data must be pd.DataFrame or List[pd.DataFrame] to be handled by Pipeline")

        elif isinstance(transformer, BaseDataSplitter):
            if not isinstance(data, pd.DataFrame):
                raise TypeError("Only one Splitter is allowed in pipeline")

            data = transformer.sample(data, accumulated_artefacts) 

    return data, accumulated_artefacts

fit_transform

fit_transform(data) -> Tuple[Union[pd.DataFrame, List[pd.DataFrame]], Dict[str, Any]]

Fit the pipeline and transform data in one step.

PARAMETER DESCRIPTION
data

Input data

RETURNS DESCRIPTION
Tuple[Union[DataFrame, List[DataFrame]], Dict[str, Any]]

Tuple[Union[pd.DataFrame, List[pd.DataFrame]], Dict[str, Any]]: Transformed data and accumulated artifacts

Source code in aboba/pipeline/pipeline.py
def fit_transform(self, data) -> Tuple[Union[pd.DataFrame, List[pd.DataFrame]], Dict[str, Any]]:
    """
    Fit the pipeline and transform data in one step.

    Args:
        data: Input data

    Returns:
        Tuple[Union[pd.DataFrame, List[pd.DataFrame]], Dict[str, Any]]: 
            Transformed data and accumulated artifacts
    """
    self.fit(data)
    return self.transform(data)