Перейти к содержанию

Обработка данных

Обработчики данных для снижения дисперсии и преобразования данных.

BucketProcessor

Исходный код

Процессор для присвоения бакетов

Назначает строкам номера бакетов на основе значения указанной колонки (через хеш). Полезно для детерминированного разбиения пользователей/объектов на бакеты с последующей агрегацией.

Инициализация

def __init__(self, source_column: str, result_column: str, n_buckets: int, **kwargs)

Параметры

  • source_column (str): Колонка, по значению которой считается хеш и определяется бакет.
  • result_column (str): Колонка, в которую будет записан номер бакета.
  • n_buckets (int): Количество бакетов.

Методы

transform

def transform(self, data: pd.DataFrame) -> Tuple[pd.DataFrame, dict]

Добавляет в данные колонку с номерами бакетов.

Параметры:

  • data (pd.DataFrame): Исходные данные для обработки.

Возвращает:

  • Tuple[pd.DataFrame, dict]: Обновлённый DataFrame и словарь артефактов (в данном процессоре пустой).

Примеры использования

import pandas as pd
import numpy as np
from aboba.processing import BucketProcessor

# Создаем данные с user_id
np.random.seed(42)
data = pd.DataFrame({
    'user_id': [f"user_{i}" for i in range(10)],
    'value': np.random.normal(100, 15, 10),
})

# Назначаем пользователей в 3 бакета по user_id
processor = BucketProcessor(
    source_column='user_id',
    result_column='bucket',
    n_buckets=3,
)

processed, _ = processor.transform(data)
print(processed)

MeanByBucketProcessor

Исходный код

Процессор усреднения по бакетам

Агрегирует данные на уровне бакетов, вычисляя среднее значение метрики в каждом бакете. Если в данных присутствует колонка группы, усреднение считается отдельно для каждой пары (bucket, group).

Инициализация

def __init__(
    self,
    value_column: str = "purchase_amount",
    bucket_column: str = "bucket",
    group_column: Optional[str] = "group",
)

Параметры

  • value_column (str): Название колонки со значениями метрики.
  • bucket_column (str): Название колонки с идентификатором бакета.
  • group_column (Optional[str]): Название колонки с группой. Если None или такой колонки нет в данных, усреднение выполняется только по бакетам. По умолчанию "group".

Методы

transform

def transform(self, data: pd.DataFrame) -> Tuple[pd.DataFrame, Dict[str, Any]]

Вычисляет среднее значение метрики по бакетам.

Параметры:

  • data (pd.DataFrame): Исходные данные.

Возвращает:

  • Tuple[pd.DataFrame, Dict[str, Any]]: DataFrame с одной строкой на бакет или на пару (bucket, group) и пустой словарь артефактов.

Примеры использования

import pandas as pd
from aboba.processing import MeanByBucketProcessor

data = pd.DataFrame({
    'bucket': [0, 0, 1, 1, 2, 2],
    'group': [0, 1, 0, 1, 0, 1],
    'metric': [10.0, 12.0, 8.0, 9.0, 11.0, 13.0],
})

processor = MeanByBucketProcessor(
    value_column='metric',
    bucket_column='bucket',
    group_column='group',
)

processed, _ = processor.transform(data)
print(processed)

CupedProcessor

Исходный код

CUPED процессор для снижения дисперсии

Применяет метод CUPED (Controlled-experiment Using Pre-Experiment Data) для снижения дисперсии метрики с использованием одной или нескольких ковариат. Значительно повышает чувствительность A/B тестов.

Реализация основана на линейной регрессии без переменной тритмента: подбирается модель Y = a + bᵀX, а трансформация вычисляется как Z = Y - bᵀX (свободный коэффициент не вычитается). Это сохраняет интерпретируемость среднего значения Z и корректно обрабатывает несколько ковариат совместно.

Инициализация

def __init__(
    self,
    value_column: str,
    covariate_columns: Union[str, List[str]],
    result_column: str,
    group_column: Optional[str] = None,
    group_test=None,
    group_control=None
)

Параметры

  • value_column (str): Название колонки с целевой метрикой.
  • covariate_columns (Union[str, List[str]]): Название одной ковариаты (строка) или список названий нескольких ковариат. Поддерживается множественная регрессия.
  • result_column (str): Название колонки для сохранения результата CUPED-преобразования.
  • group_column (Optional[str]): Устарело; сохранено для обратной совместимости. Регрессия всегда подбирается по всем данным без переменной группы. По умолчанию None.
  • group_test, group_control: Устарело; сохранено для обратной совместимости.

Методы

transform

def transform(self, data: pd.DataFrame) -> Tuple[pd.DataFrame, dict]

Применяет CUPED-преобразование к данным.

Параметры:

  • data (pd.DataFrame): Данные для обработки.

Возвращает:

  • Tuple[pd.DataFrame, dict]: Данные с новой колонкой result_column и пустой словарь артефактов.

Примеры использования

import pandas as pd
import numpy as np
from aboba.processing import CupedProcessor

np.random.seed(42)
n = 1000
x1 = np.random.normal(100, 20, n)
x2 = 0.7 * x1 + np.random.normal(0, 8, n)

data = pd.DataFrame({
    'group':  np.repeat([0, 1], n // 2),
    'x1':     x1,
    'x2':     x2,
    'target': 2 * x1 - x2 + np.random.normal(0, 10, n),
})

# Одна ковариата
processor_1 = CupedProcessor(
    value_column='target',
    covariate_columns='x1',
    result_column='target_c1',
)
out1, _ = processor_1.transform(data.copy())
print(f"Дисперсия (1 ковариата): {out1['target_c1'].var():.2f}")

# Две ковариаты — лучше снижает дисперсию
processor_2 = CupedProcessor(
    value_column='target',
    covariate_columns=['x1', 'x2'],
    result_column='target_c2',
)
out2, _ = processor_2.transform(data.copy())
print(f"Дисперсия (2 ковариаты): {out2['target_c2'].var():.2f}")
print(f"Исходная дисперсия:       {data['target'].var():.2f}")

LinearizationProcessor

Исходный код

Процессор линеаризации ratio-метрик

Преобразует ratio-метрику в линейную метрику на уровне пользователя. Это удобно для последующего применения обычных тестов сравнения средних к отношениям вида "числитель / знаменатель".

Процессор:

  • вычисляет коэффициент kappa по контрольной и тестовой группам;
  • агрегирует данные до уровня (group_column, user_name);
  • создаёт новую колонку с линеаризованным значением.

Однопериодный режим (по умолчанию): period_column=None — одна kappa на всех строках входа.

Двухпериодный режим: если задан period_column, данные режутся на пилот и препилот; для каждого среза считается своя kappa, линеаризация и user-level агрегация, затем результат объединяется по (group_column, user_name). Колонка result_name — линеаризованная метрика пилота, prepilot_result_nameпрепилота (удобно как ковариата для CUPED). Суммы числителя и знаменателя после merge получают суффиксы _pilot и _prepilot. По умолчанию merge_how="inner" (только пользователи, присутствующие в обоих периодах).

Вход списком из двух DataFrame (например, выход UserSplitter): куски склеиваются вертикально; первому присваивается group_control, второму — group_test (колонка group_column может отсутствовать в кусках — она создаётся). После линеаризации результат снова делится на два фрейма [контроль, тест] в том же порядке, чтобы дальше можно было вызывать тесты вроде CupedLinearRegressionTTest без GroupSplitter. В Pipeline после сплиттера LinearizationProcessor вызывается один раз на весь список (специальная ветка).

Если denominator_name не задан, используется временный знаменатель, заполненный 1.0, что позволяет применять процессор и к обычным не-ratio метрикам.

Инициализация

def __init__(
    self,
    numerator_name: str = "session_lengths",
    denominator_name: Optional[str] = None,
    user_name: str = "user_id",
    group_column: str = "group",
    group_control: Any = 0,
    group_test: Any = 1,
    result_name: str = "linearized_values",
    eta: float = 0.0,
    period_column: Optional[str] = None,
    period_pilot: Any = None,
    period_prepilot: Any = None,
    prepilot_result_name: str = "linearized_prepilot",
    merge_how: str = "inner",
)

Параметры

  • numerator_name (str): Название колонки с числителем ratio-метрики.
  • denominator_name (Optional[str]): Название колонки со знаменателем. Если None, создаётся временный знаменатель со значением 1.0. По умолчанию None.
  • user_name (str): Название колонки с идентификатором пользователя. По умолчанию "user_id".
  • group_column (str): Название колонки с меткой группы. По умолчанию "group".
  • group_control (Any): Значение контрольной группы. По умолчанию 0.
  • group_test (Any): Значение тестовой группы. По умолчанию 1.
  • result_name (str): Название колонки для сохранения линеаризованной метрики (пилот в двухпериодном режиме). По умолчанию "linearized_values".
  • eta (float): Вес тестовой группы при вычислении kappa. По умолчанию 0.0.
  • period_column (Optional[str]): Колонка периода; если не None, включается двухпериодный режим.
  • period_pilot / period_prepilot (Any): Значения в period_column для пилота и препилота; обязательны, если задан period_column. Должны различаться.
  • prepilot_result_name (str): Имя колонки с линеаризованной метрикой препилота. Не должно совпадать с result_name при двухпериодном режиме.
  • merge_how (str): "inner" или "outer" — способ merge по (group_column, user_name).

Методы

transform

def transform(
    self, data: Union[pd.DataFrame, List[pd.DataFrame]]
) -> Tuple[Union[pd.DataFrame, List[pd.DataFrame]], Dict[str, float]]

Агрегирует данные на уровне пользователя и рассчитывает линеаризованную метрику.

Параметры:

  • data (pd.DataFrame или ровно два DataFrame в списке): Исходные данные. Список используется после UserSplitter (см. выше).

Возвращает:

  • Tuple[Union[pd.DataFrame, List[pd.DataFrame]], Dict[str, float]]: один агрегированный DataFrame или список из двух (контроль и тест), если вход был списком. Колонки result_name / prepilot_result_name как в однопериодном и двухпериодном режимах. Артефакты: в однопериодном режиме — kappa; в двухпериодном — kappa_pilot, kappa_prepilot и kappa (то же, что kappa_pilot).

Исключения:

  • ValueError: Если отсутствуют обязательные колонки.
  • ValueError: Если в данных отсутствует контрольная или тестовая группа (в срезе или во всём датафрейме).
  • ValueError: Неверный merge_how, не заданы period_pilot / period_prepilot при period_column, совпадение имён result_name и prepilot_result_name, пустой срез пилота или препилота.

Примеры использования

Однопериодный режим:

import pandas as pd
from aboba.processing import LinearizationProcessor

data = pd.DataFrame({
    'user_id': [1, 1, 2, 2, 3, 3, 4, 4],
    'group': [0, 0, 0, 0, 1, 1, 1, 1],
    'clicks': [1, 2, 0, 1, 3, 1, 2, 2],
    'views': [10, 11, 9, 8, 12, 10, 11, 9],
})

processor = LinearizationProcessor(
    numerator_name='clicks',
    denominator_name='views',
    user_name='user_id',
    group_column='group',
    result_name='linearized_clicks',
)

processed, artefacts = processor.transform(data)
print(processed.head())
print(artefacts['kappa'])

Двухпериодный режим на сырых сессиях без колонки group: UserSplitter делит пользователей на два куска, затем LinearizationProcessor в Pipeline вызывается один раз на список из двух DataFrame, внутри объединяет плечи, линеаризует и возвращает снова два user-level фрейма [контроль, тест] — так можно подать данные в CupedLinearRegressionTTest без GroupSplitter. Если в данных уже есть group, передайте в линеаризацию один DataFrame как раньше.

from aboba.processing import LinearizationProcessor
from aboba.pipeline import Pipeline
from aboba.splitters.user_splitter import UserSplitter

# sessions: period (0=препилот, 1=пилот), user_id, clicks, views — без group
user_split = UserSplitter(size=None, user_column='user_id', add_group_column=None)
lin = LinearizationProcessor(
    numerator_name='clicks',
    denominator_name='views',
    period_column='period',
    period_pilot=1,
    period_prepilot=0,
    result_name='linearized_values',
    prepilot_result_name='lin_pre',
)
pipeline = Pipeline([
    ('users', user_split),
    ('lin', lin),
])
# control_df, test_df = pipeline.transform(sessions)[0]  # список из двух таблиц
# далее CupedLinearRegressionTTest(covariate_names=['lin_pre'], value_column='linearized_values', ...)

Для пулового преобразования CupedProcessor нужен один общий DataFrame; его можно собрать из объединённых сессий с колонкой group и отдельным пайплайном, либо опираться на регрессионный CUPED в тесте.


RegressionProcessor

Исходный код

Процессор на основе регрессии (в данный момент отключён)

В исходном коде текущей версии RegressionProcessor помечен как не реализованный (NotImplementedError) и не предназначен для использования в продакшене.
Интерфейс может измениться в будущих версиях, поэтому примеры использования здесь не приводятся намеренно, чтобы не вводить в заблуждение.

Для снижения дисперсии с помощью ковариат в актуальной версии рекомендуется использовать CupedProcessor или тест CupedLinearRegressionTTest.


RenameColsProcessor

Исходный код

Процессор для переименования колонок

Переименовывает колонки в DataFrame согласно заданному маппингу. Полезно для стандартизации названий колонок перед применением тестов.

Инициализация

def __init__(self, rename_map: Dict[str, str])

Параметры

  • rename_map (Dict[str, str]): Словарь для переименования колонок (старое_имя: новое_имя).

Методы

transform

def transform(self, data: pd.DataFrame) -> Tuple[pd.DataFrame, dict]

Переименовывает колонки в данных.

Параметры:

  • data (pd.DataFrame): Исходные данные.

Возвращает:

  • Tuple[pd.DataFrame, dict]: Данные с переименованными колонками и словарь артефактов (пустой).

Примеры использования

import pandas as pd
from aboba.processing import RenameColsProcessor

# Создаем данные с нестандартными названиями
data = pd.DataFrame({
    'user_metric': [1, 2, 3, 4, 5],
    'exp_group': ['A', 'B', 'A', 'B', 'A']
})

# Переименовываем колонки
processor = RenameColsProcessor({
    'user_metric': 'target',
    'exp_group': 'group'
})

processed, _ = processor.transform(data)
print(f"Исходные колонки: {list(data.columns)}")
print(f"Новые колонки: {list(processed.columns)}")

EnsureColsProcessor

Исходный код

Процессор для проверки наличия колонок

Проверяет наличие необходимых колонок в данных и выбрасывает исключение, если какие-то колонки отсутствуют. Используется для валидации данных перед обработкой.

Инициализация

def __init__(self, cols: List[str])

Параметры

  • cols (List[str]): Список обязательных колонок.

Методы

transform

def transform(self, data: pd.DataFrame) -> Tuple[pd.DataFrame, dict]

Проверяет наличие обязательных колонок.

Параметры:

  • data (pd.DataFrame): Данные для проверки.

Возвращает:

  • Tuple[pd.DataFrame, dict]: Исходные данные (если все колонки присутствуют) и словарь артефактов (пустой).

Исключения: - ValueError: Если какие-то обязательные колонки отсутствуют.

Примеры использования

import pandas as pd
from aboba.processing import EnsureColsProcessor

# Создаем данные
data = pd.DataFrame({
    'target': [1, 2, 3],
    'group': ['A', 'B', 'A']
})

# Проверяем наличие колонок
processor = EnsureColsProcessor(['target', 'group'])
processed, _ = processor.transform(data)  # OK

# Попытка с отсутствующей колонкой
try:
    processor_strict = EnsureColsProcessor(['target', 'group', 'missing_col'])
    processor_strict.transform(data)
except AssertionError as e:
    print(f"Ошибка: {e}")

PreprocessorPipeline

Исходный код

Пайплайн препроцессоров

Объединяет несколько процессоров в единый пайплайн для последовательного применения. Упрощенная версия Pipeline, специально для препроцессинга.

Инициализация

def __init__(self, steps: List[BaseDataProcessor], verbose: bool = False, **kwargs)

Параметры

  • steps (List[BaseDataProcessor]): Список процессоров для последовательного применения.
  • verbose (bool): Логировать прогресс через tqdm. По умолчанию False.

Методы

fit

def fit(self, data: pd.DataFrame) -> 'PreprocessorPipeline'

Обучает все процессоры в пайплайне на данных (по очереди, передавая выход одного шага в следующий).

transform

def transform(self, data: pd.DataFrame) -> Tuple[pd.DataFrame, dict]

Применяет все процессоры к данным (предполагается, что fit уже был вызван).

fit_transform

def fit_transform(self, data: pd.DataFrame) -> Tuple[pd.DataFrame, dict]

Комбинация fit и transform за один вызов.

Примеры использования

import pandas as pd
import numpy as np
from aboba.processing import (
    PreprocessorPipeline,
    RenameColsProcessor,
    EnsureColsProcessor,
    CupedProcessor
)

# Создаем данные
np.random.seed(42)
data = pd.DataFrame({
    'metric': np.random.normal(100, 15, 1000),
    'pre_metric': np.random.normal(100, 15, 1000),
    'exp_group': np.random.choice(['control', 'test'], 1000)
})

# Создаем пайплайн препроцессоров
pipeline = PreprocessorPipeline([
    RenameColsProcessor({'metric': 'target', 'exp_group': 'group', 'pre_metric': 'covariate'}),
    EnsureColsProcessor(['target', 'group', 'covariate']),
    CupedProcessor(
        value_column='target',
        covariate_columns='covariate',
        result_column='target',
    ),
])

# Обучаем и применяем пайплайн
processed, _ = pipeline.fit_transform(data)