Перейти к содержанию

Пайплайн

Пайплайн для последовательного объединения нескольких операций.

Pipeline

Исходный код

Класс для создания пайплайна обработки данных

Позволяет объединить несколько процессоров данных в единый пайплайн, который последовательно применяет все операции к данным. Упрощает организацию сложных цепочек преобразований данных.

Инициализация

def __init__(
    self,
    steps: List[Tuple[str, Union[BaseDataSplitter, BaseDataProcessor]]]
)

Параметры

  • steps (List[Tuple[str, Union[BaseDataSplitter, BaseDataProcessor]]]): Список кортежей, где каждый кортеж содержит название шага и процессор данных или сэмплер.

Методы

fit

def fit(self, data: pd.DataFrame) -> 'Pipeline'

Обучает все процессоры в пайплайне на данных, последовательно применяя их к датафрейму (и, при наличии сэмплера, превращая данные в список групп).

Параметры:

  • data (pd.DataFrame): Данные для обучения процессоров.

Возвращает:

  • Pipeline: Обученный пайплайн (self).

transform

def transform(self, data: Union[pd.DataFrame, List[pd.DataFrame]]) -> Union[pd.DataFrame, List[pd.DataFrame]]

Последовательно применяет все шаги пайплайна к данным (без дополнительного обучения).

Параметры:

  • data (Union[pd.DataFrame, List[pd.DataFrame]]): Исходные данные или список групп.

Возвращает:

  • Union[pd.DataFrame, List[pd.DataFrame]]: Обработанные данные или список групп.

Примеры использования

import pandas as pd
import numpy as np
from aboba.pipeline import Pipeline
from aboba.processing import CupedProcessor, RenameColsProcessor

# Создаем тестовые данные
np.random.seed(42)
data = pd.DataFrame({
    'user_id': range(100),
    'metric': np.random.normal(100, 15, 100),
    'covariate': np.random.normal(50, 10, 100),
    'group': ['control'] * 50 + ['test'] * 50
})

# Создаем пайплайн из нескольких процессоров
pipeline = Pipeline([
    ('rename', RenameColsProcessor({'metric': 'target'})),
    ('cuped', CupedProcessor(
        value_column='target',
        covariate_columns='covariate',
        result_column='target',
        group_column='group',
    ))
])

# Обучаем и применяем пайплайн
pipeline.fit(data)
processed_data = pipeline.transform(data)

print(f"Исходная дисперсия: {data['metric'].var():.2f}")
print(f"Дисперсия после CUPED: {processed_data['target'].var():.2f}")

Использование с экспериментом

from aboba.experiment import AbobaExperiment, ExperimentGroup
from aboba.tests import AbsoluteIndependentTTest
from aboba.splitters import GroupSplitter

# Создаем пайплайн препроцессинга
preprocessing_pipeline = Pipeline([
    ('rename', RenameColsProcessor({'revenue': 'target'})),
    ('cuped', CupedProcessor(
        value_column='target',
        covariate_column='previous_revenue',
        group_column='group'
    ))
])

# Используем в эксперименте
experiment = AbobaExperiment(
    data_source=data_source,
    groups=[
        ExperimentGroup('control', GroupSplitter('group', 'control')),
        ExperimentGroup('test', GroupSplitter('group', 'test'))
    ],
    test=AbsoluteIndependentTTest(value_column='target'),
    preprocessors=[preprocessing_pipeline]
)

result = experiment.run()