Пайплайн
Пайплайн для последовательного объединения нескольких операций.
Pipeline
Класс для создания пайплайна обработки данных
Позволяет объединить несколько процессоров данных в единый пайплайн, который последовательно применяет все операции к данным. Упрощает организацию сложных цепочек преобразований данных.
Инициализация
Параметры
- steps (List[Tuple[str, Union[BaseDataSplitter, BaseDataProcessor]]]): Список кортежей, где каждый кортеж содержит название шага и процессор данных или сэмплер.
Методы
fit
Обучает все процессоры в пайплайне на данных, последовательно применяя их к датафрейму (и, при наличии сэмплера, превращая данные в список групп).
Параметры:
- data (pd.DataFrame): Данные для обучения процессоров.
Возвращает:
- Pipeline: Обученный пайплайн (self).
transform
def transform(self, data: Union[pd.DataFrame, List[pd.DataFrame]]) -> Union[pd.DataFrame, List[pd.DataFrame]]
Последовательно применяет все шаги пайплайна к данным (без дополнительного обучения).
Параметры:
- data (Union[pd.DataFrame, List[pd.DataFrame]]): Исходные данные или список групп.
Возвращает:
- Union[pd.DataFrame, List[pd.DataFrame]]: Обработанные данные или список групп.
Примеры использования
import pandas as pd
import numpy as np
from aboba.pipeline import Pipeline
from aboba.processing import CupedProcessor, RenameColsProcessor
# Создаем тестовые данные
np.random.seed(42)
data = pd.DataFrame({
'user_id': range(100),
'metric': np.random.normal(100, 15, 100),
'covariate': np.random.normal(50, 10, 100),
'group': ['control'] * 50 + ['test'] * 50
})
# Создаем пайплайн из нескольких процессоров
pipeline = Pipeline([
('rename', RenameColsProcessor({'metric': 'target'})),
('cuped', CupedProcessor(
value_column='target',
covariate_columns='covariate',
result_column='target',
group_column='group',
))
])
# Обучаем и применяем пайплайн
pipeline.fit(data)
processed_data = pipeline.transform(data)
print(f"Исходная дисперсия: {data['metric'].var():.2f}")
print(f"Дисперсия после CUPED: {processed_data['target'].var():.2f}")
Использование с экспериментом
from aboba.experiment import AbobaExperiment, ExperimentGroup
from aboba.tests import AbsoluteIndependentTTest
from aboba.splitters import GroupSplitter
# Создаем пайплайн препроцессинга
preprocessing_pipeline = Pipeline([
('rename', RenameColsProcessor({'revenue': 'target'})),
('cuped', CupedProcessor(
value_column='target',
covariate_column='previous_revenue',
group_column='group'
))
])
# Используем в эксперименте
experiment = AbobaExperiment(
data_source=data_source,
groups=[
ExperimentGroup('control', GroupSplitter('group', 'control')),
ExperimentGroup('test', GroupSplitter('group', 'test'))
],
test=AbsoluteIndependentTTest(value_column='target'),
preprocessors=[preprocessing_pipeline]
)
result = experiment.run()