Обработка данных
Обработчики данных для снижения дисперсии и преобразования данных.
BucketProcessor
Процессор для присвоения бакетов
Назначает строкам номера бакетов на основе значения указанной колонки (через хеш). Полезно для детерминированного разбиения пользователей/объектов на бакеты с последующей агрегацией.
Инициализация
Параметры
- source_column (str): Колонка, по значению которой считается хеш и определяется бакет.
- result_column (str): Колонка, в которую будет записан номер бакета.
- n_buckets (int): Количество бакетов.
Методы
transform
Добавляет в данные колонку с номерами бакетов.
Параметры:
- data (pd.DataFrame): Исходные данные для обработки.
Возвращает:
- Tuple[pd.DataFrame, dict]: Обновлённый DataFrame и словарь артефактов (в данном процессоре пустой).
Примеры использования
import pandas as pd
import numpy as np
from aboba.processing import BucketProcessor
# Создаем данные с user_id
np.random.seed(42)
data = pd.DataFrame({
'user_id': [f"user_{i}" for i in range(10)],
'value': np.random.normal(100, 15, 10),
})
# Назначаем пользователей в 3 бакета по user_id
processor = BucketProcessor(
source_column='user_id',
result_column='bucket',
n_buckets=3,
)
processed, _ = processor.transform(data)
print(processed)
MeanByBucketProcessor
Процессор усреднения по бакетам
Агрегирует данные на уровне бакетов, вычисляя среднее значение метрики в каждом бакете. Если в данных присутствует колонка группы, усреднение считается отдельно для каждой пары (bucket, group).
Инициализация
def __init__(
self,
value_column: str = "purchase_amount",
bucket_column: str = "bucket",
group_column: Optional[str] = "group",
)
Параметры
- value_column (str): Название колонки со значениями метрики.
- bucket_column (str): Название колонки с идентификатором бакета.
- group_column (Optional[str]): Название колонки с группой. Если
Noneили такой колонки нет в данных, усреднение выполняется только по бакетам. По умолчанию"group".
Методы
transform
Вычисляет среднее значение метрики по бакетам.
Параметры:
- data (pd.DataFrame): Исходные данные.
Возвращает:
- Tuple[pd.DataFrame, Dict[str, Any]]: DataFrame с одной строкой на бакет или на пару
(bucket, group)и пустой словарь артефактов.
Примеры использования
import pandas as pd
from aboba.processing import MeanByBucketProcessor
data = pd.DataFrame({
'bucket': [0, 0, 1, 1, 2, 2],
'group': [0, 1, 0, 1, 0, 1],
'metric': [10.0, 12.0, 8.0, 9.0, 11.0, 13.0],
})
processor = MeanByBucketProcessor(
value_column='metric',
bucket_column='bucket',
group_column='group',
)
processed, _ = processor.transform(data)
print(processed)
CupedProcessor
CUPED процессор для снижения дисперсии
Применяет метод CUPED (Controlled-experiment Using Pre-Experiment Data) для снижения дисперсии метрики с использованием одной или нескольких ковариат. Значительно повышает чувствительность A/B тестов.
Реализация основана на линейной регрессии без переменной тритмента: подбирается модель Y = a + bᵀX, а трансформация вычисляется как Z = Y - bᵀX (свободный коэффициент не вычитается). Это сохраняет интерпретируемость среднего значения Z и корректно обрабатывает несколько ковариат совместно.
Инициализация
def __init__(
self,
value_column: str,
covariate_columns: Union[str, List[str]],
result_column: str,
group_column: Optional[str] = None,
group_test=None,
group_control=None
)
Параметры
- value_column (str): Название колонки с целевой метрикой.
- covariate_columns (Union[str, List[str]]): Название одной ковариаты (строка) или список названий нескольких ковариат. Поддерживается множественная регрессия.
- result_column (str): Название колонки для сохранения результата CUPED-преобразования.
- group_column (Optional[str]): Устарело; сохранено для обратной совместимости. Регрессия всегда подбирается по всем данным без переменной группы. По умолчанию
None. - group_test, group_control: Устарело; сохранено для обратной совместимости.
Методы
transform
Применяет CUPED-преобразование к данным.
Параметры:
- data (pd.DataFrame): Данные для обработки.
Возвращает:
- Tuple[pd.DataFrame, dict]: Данные с новой колонкой
result_columnи пустой словарь артефактов.
Примеры использования
import pandas as pd
import numpy as np
from aboba.processing import CupedProcessor
np.random.seed(42)
n = 1000
x1 = np.random.normal(100, 20, n)
x2 = 0.7 * x1 + np.random.normal(0, 8, n)
data = pd.DataFrame({
'group': np.repeat([0, 1], n // 2),
'x1': x1,
'x2': x2,
'target': 2 * x1 - x2 + np.random.normal(0, 10, n),
})
# Одна ковариата
processor_1 = CupedProcessor(
value_column='target',
covariate_columns='x1',
result_column='target_c1',
)
out1, _ = processor_1.transform(data.copy())
print(f"Дисперсия (1 ковариата): {out1['target_c1'].var():.2f}")
# Две ковариаты — лучше снижает дисперсию
processor_2 = CupedProcessor(
value_column='target',
covariate_columns=['x1', 'x2'],
result_column='target_c2',
)
out2, _ = processor_2.transform(data.copy())
print(f"Дисперсия (2 ковариаты): {out2['target_c2'].var():.2f}")
print(f"Исходная дисперсия: {data['target'].var():.2f}")
LinearizationProcessor
Процессор линеаризации ratio-метрик
Преобразует ratio-метрику в линейную метрику на уровне пользователя. Это удобно для последующего применения обычных тестов сравнения средних к отношениям вида "числитель / знаменатель".
Процессор:
- вычисляет коэффициент
kappaпо контрольной и тестовой группам; - агрегирует данные до уровня
(group_column, user_name); - создаёт новую колонку с линеаризованным значением.
Однопериодный режим (по умолчанию): period_column=None — одна kappa на всех строках входа.
Двухпериодный режим: если задан period_column, данные режутся на пилот и препилот; для каждого среза считается своя kappa, линеаризация и user-level агрегация, затем результат объединяется по (group_column, user_name). Колонка result_name — линеаризованная метрика пилота, prepilot_result_name — препилота (удобно как ковариата для CUPED). Суммы числителя и знаменателя после merge получают суффиксы _pilot и _prepilot. По умолчанию merge_how="inner" (только пользователи, присутствующие в обоих периодах).
Вход списком из двух DataFrame (например, выход UserSplitter): куски склеиваются вертикально; первому присваивается group_control, второму — group_test (колонка group_column может отсутствовать в кусках — она создаётся). После линеаризации результат снова делится на два фрейма [контроль, тест] в том же порядке, чтобы дальше можно было вызывать тесты вроде CupedLinearRegressionTTest без GroupSplitter. В Pipeline после сплиттера LinearizationProcessor вызывается один раз на весь список (специальная ветка).
Если denominator_name не задан, используется временный знаменатель, заполненный 1.0, что позволяет применять процессор и к обычным не-ratio метрикам.
Инициализация
def __init__(
self,
numerator_name: str = "session_lengths",
denominator_name: Optional[str] = None,
user_name: str = "user_id",
group_column: str = "group",
group_control: Any = 0,
group_test: Any = 1,
result_name: str = "linearized_values",
eta: float = 0.0,
period_column: Optional[str] = None,
period_pilot: Any = None,
period_prepilot: Any = None,
prepilot_result_name: str = "linearized_prepilot",
merge_how: str = "inner",
)
Параметры
- numerator_name (str): Название колонки с числителем ratio-метрики.
- denominator_name (Optional[str]): Название колонки со знаменателем. Если
None, создаётся временный знаменатель со значением1.0. По умолчаниюNone. - user_name (str): Название колонки с идентификатором пользователя. По умолчанию
"user_id". - group_column (str): Название колонки с меткой группы. По умолчанию
"group". - group_control (Any): Значение контрольной группы. По умолчанию
0. - group_test (Any): Значение тестовой группы. По умолчанию
1. - result_name (str): Название колонки для сохранения линеаризованной метрики (пилот в двухпериодном режиме). По умолчанию
"linearized_values". - eta (float): Вес тестовой группы при вычислении
kappa. По умолчанию0.0. - period_column (Optional[str]): Колонка периода; если не
None, включается двухпериодный режим. - period_pilot / period_prepilot (Any): Значения в
period_columnдля пилота и препилота; обязательны, если заданperiod_column. Должны различаться. - prepilot_result_name (str): Имя колонки с линеаризованной метрикой препилота. Не должно совпадать с
result_nameпри двухпериодном режиме. - merge_how (str):
"inner"или"outer"— способmergeпо(group_column, user_name).
Методы
transform
def transform(
self, data: Union[pd.DataFrame, List[pd.DataFrame]]
) -> Tuple[Union[pd.DataFrame, List[pd.DataFrame]], Dict[str, float]]
Агрегирует данные на уровне пользователя и рассчитывает линеаризованную метрику.
Параметры:
- data (
pd.DataFrameили ровно дваDataFrameв списке): Исходные данные. Список используется послеUserSplitter(см. выше).
Возвращает:
- Tuple[Union[pd.DataFrame, List[pd.DataFrame]], Dict[str, float]]: один агрегированный
DataFrameили список из двух (контроль и тест), если вход был списком. Колонкиresult_name/prepilot_result_nameкак в однопериодном и двухпериодном режимах. Артефакты: в однопериодном режиме —kappa; в двухпериодном —kappa_pilot,kappa_prepilotиkappa(то же, чтоkappa_pilot).
Исключения:
- ValueError: Если отсутствуют обязательные колонки.
- ValueError: Если в данных отсутствует контрольная или тестовая группа (в срезе или во всём датафрейме).
- ValueError: Неверный
merge_how, не заданыperiod_pilot/period_prepilotприperiod_column, совпадение имёнresult_nameиprepilot_result_name, пустой срез пилота или препилота.
Примеры использования
Однопериодный режим:
import pandas as pd
from aboba.processing import LinearizationProcessor
data = pd.DataFrame({
'user_id': [1, 1, 2, 2, 3, 3, 4, 4],
'group': [0, 0, 0, 0, 1, 1, 1, 1],
'clicks': [1, 2, 0, 1, 3, 1, 2, 2],
'views': [10, 11, 9, 8, 12, 10, 11, 9],
})
processor = LinearizationProcessor(
numerator_name='clicks',
denominator_name='views',
user_name='user_id',
group_column='group',
result_name='linearized_clicks',
)
processed, artefacts = processor.transform(data)
print(processed.head())
print(artefacts['kappa'])
Двухпериодный режим на сырых сессиях без колонки group: UserSplitter делит пользователей на два куска, затем LinearizationProcessor в Pipeline вызывается один раз на список из двух DataFrame, внутри объединяет плечи, линеаризует и возвращает снова два user-level фрейма [контроль, тест] — так можно подать данные в CupedLinearRegressionTTest без GroupSplitter. Если в данных уже есть group, передайте в линеаризацию один DataFrame как раньше.
from aboba.processing import LinearizationProcessor
from aboba.pipeline import Pipeline
from aboba.splitters.user_splitter import UserSplitter
# sessions: period (0=препилот, 1=пилот), user_id, clicks, views — без group
user_split = UserSplitter(size=None, user_column='user_id', add_group_column=None)
lin = LinearizationProcessor(
numerator_name='clicks',
denominator_name='views',
period_column='period',
period_pilot=1,
period_prepilot=0,
result_name='linearized_values',
prepilot_result_name='lin_pre',
)
pipeline = Pipeline([
('users', user_split),
('lin', lin),
])
# control_df, test_df = pipeline.transform(sessions)[0] # список из двух таблиц
# далее CupedLinearRegressionTTest(covariate_names=['lin_pre'], value_column='linearized_values', ...)
Для пулового преобразования CupedProcessor нужен один общий DataFrame; его можно собрать из объединённых сессий с колонкой group и отдельным пайплайном, либо опираться на регрессионный CUPED в тесте.
RegressionProcessor
Процессор на основе регрессии (в данный момент отключён)
В исходном коде текущей версии RegressionProcessor помечен как не реализованный (NotImplementedError) и не предназначен для использования в продакшене.
Интерфейс может измениться в будущих версиях, поэтому примеры использования здесь не приводятся намеренно, чтобы не вводить в заблуждение.
Для снижения дисперсии с помощью ковариат в актуальной версии рекомендуется использовать CupedProcessor или тест CupedLinearRegressionTTest.
RenameColsProcessor
Процессор для переименования колонок
Переименовывает колонки в DataFrame согласно заданному маппингу. Полезно для стандартизации названий колонок перед применением тестов.
Инициализация
Параметры
- rename_map (Dict[str, str]): Словарь для переименования колонок (старое_имя: новое_имя).
Методы
transform
Переименовывает колонки в данных.
Параметры:
- data (pd.DataFrame): Исходные данные.
Возвращает:
- Tuple[pd.DataFrame, dict]: Данные с переименованными колонками и словарь артефактов (пустой).
Примеры использования
import pandas as pd
from aboba.processing import RenameColsProcessor
# Создаем данные с нестандартными названиями
data = pd.DataFrame({
'user_metric': [1, 2, 3, 4, 5],
'exp_group': ['A', 'B', 'A', 'B', 'A']
})
# Переименовываем колонки
processor = RenameColsProcessor({
'user_metric': 'target',
'exp_group': 'group'
})
processed, _ = processor.transform(data)
print(f"Исходные колонки: {list(data.columns)}")
print(f"Новые колонки: {list(processed.columns)}")
EnsureColsProcessor
Процессор для проверки наличия колонок
Проверяет наличие необходимых колонок в данных и выбрасывает исключение, если какие-то колонки отсутствуют. Используется для валидации данных перед обработкой.
Инициализация
Параметры
- cols (List[str]): Список обязательных колонок.
Методы
transform
Проверяет наличие обязательных колонок.
Параметры:
- data (pd.DataFrame): Данные для проверки.
Возвращает:
- Tuple[pd.DataFrame, dict]: Исходные данные (если все колонки присутствуют) и словарь артефактов (пустой).
Исключения: - ValueError: Если какие-то обязательные колонки отсутствуют.
Примеры использования
import pandas as pd
from aboba.processing import EnsureColsProcessor
# Создаем данные
data = pd.DataFrame({
'target': [1, 2, 3],
'group': ['A', 'B', 'A']
})
# Проверяем наличие колонок
processor = EnsureColsProcessor(['target', 'group'])
processed, _ = processor.transform(data) # OK
# Попытка с отсутствующей колонкой
try:
processor_strict = EnsureColsProcessor(['target', 'group', 'missing_col'])
processor_strict.transform(data)
except AssertionError as e:
print(f"Ошибка: {e}")
PreprocessorPipeline
Пайплайн препроцессоров
Объединяет несколько процессоров в единый пайплайн для последовательного применения. Упрощенная версия Pipeline, специально для препроцессинга.
Инициализация
Параметры
- steps (List[BaseDataProcessor]): Список процессоров для последовательного применения.
- verbose (bool): Логировать прогресс через
tqdm. По умолчаниюFalse.
Методы
fit
Обучает все процессоры в пайплайне на данных (по очереди, передавая выход одного шага в следующий).
transform
Применяет все процессоры к данным (предполагается, что fit уже был вызван).
fit_transform
Комбинация fit и transform за один вызов.
Примеры использования
import pandas as pd
import numpy as np
from aboba.processing import (
PreprocessorPipeline,
RenameColsProcessor,
EnsureColsProcessor,
CupedProcessor
)
# Создаем данные
np.random.seed(42)
data = pd.DataFrame({
'metric': np.random.normal(100, 15, 1000),
'pre_metric': np.random.normal(100, 15, 1000),
'exp_group': np.random.choice(['control', 'test'], 1000)
})
# Создаем пайплайн препроцессоров
pipeline = PreprocessorPipeline([
RenameColsProcessor({'metric': 'target', 'exp_group': 'group', 'pre_metric': 'covariate'}),
EnsureColsProcessor(['target', 'group', 'covariate']),
CupedProcessor(
value_column='target',
covariate_columns='covariate',
result_column='target',
),
])
# Обучаем и применяем пайплайн
processed, _ = pipeline.fit_transform(data)