Data Processing
Data processors for variance reduction and data transformation.
BucketProcessor
BucketProcessor
Bases: BaseDataProcessor
Processor that assigns rows to buckets based on a hash of a source column.
This processor creates a new column with bucket assignments by hashing the values in a source column and taking the modulo with the number of buckets. This ensures consistent bucket assignment for the same source values.
Examples:
import pandas as pd
from aboba.processing.bucket_processor import BucketProcessor
# Create sample data
data = pd.DataFrame({
'user_id': ['user1', 'user2', 'user3', 'user4', 'user5']
})
# Assign users to 3 buckets
processor = BucketProcessor(
source_column='user_id',
result_column='bucket',
n_buckets=3
)
processed_data, _ = processor.transform(data)
print(processed_data)
Source code in aboba/processing/bucket_processor.py
CupedProcessor
CupedProcessor
Bases: BaseDataProcessor
Performs CUPED (Controlled-experiment Using Pre-Experiment Data) transformation.
CUPED is a variance reduction technique that uses pre-experiment data (covariates) to improve the sensitivity of A/B tests. It adjusts the target metric using information from covariates that are correlated with the target metric.
The transformation fits a linear regression Y = a + b^T X (without a treatment variable) and produces Z = Y - b^T X. Note that the intercept is not subtracted, so the mean of Z equals the intercept, preserving interpretability.
For RelativeIndependentTTest compatibility, saves the original control group mean in artifacts for correct relative effect calculation.
Examples:
import pandas as pd
import numpy as np
from aboba.processing.cuped_processor import CupedProcessor
# Create sample data with pre-experiment data
np.random.seed(42)
n = 1000
# Pre-experiment metric (covariate)
pre_metric = np.random.normal(100, 10, n)
# Post-experiment metric (target) - correlated with pre-metric
post_metric = 2 * pre_metric + np.random.normal(0, 5, n)
# Add treatment effect for second half
post_metric[n//2:] += 5
data = pd.DataFrame({
'group': [0] * (n//2) + [1] * (n//2),
'pre_metric': pre_metric,
'post_metric': post_metric
})
# Apply CUPED transformation (single covariate)
processor = CupedProcessor(
value_column='post_metric',
covariate_columns='pre_metric',
result_column='cuped_metric',
group_column='group',
group_control=0,
)
processed_data, artifacts = processor.transform(data)
print(f"Original variance: {data['post_metric'].var():.2f}")
print(f"CUPED variance: {processed_data['cuped_metric'].var():.2f}")
if 'cuped_original_control_mean' in artifacts:
print(f"Original control mean: {artifacts['cuped_original_control_mean']:.2f}")
Source code in aboba/processing/cuped_processor.py
9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 | |
__init__
__init__(value_column: str, covariate_columns: Union[str, List[str]], result_column: str, group_column: Optional[str] = None, group_test: Any = None, group_control: Any = None)
Initialize the CUPED processor.
| PARAMETER | DESCRIPTION |
|---|---|
value_column
|
Name of the column with the target metric values.
TYPE:
|
covariate_columns
|
Name(s) of the column(s) with covariate values. Accepts a single column name or a list of names.
TYPE:
|
result_column
|
Name of the column to store the CUPED-transformed values.
TYPE:
|
group_column
|
Column name for group identification. Required for saving original control mean when using with RelativeIndependentTTest.
TYPE:
|
group_test
|
Value in group_column that identifies test group (reserved for future use).
TYPE:
|
group_control
|
Value in group_column that identifies control group. Required for saving original control mean when using with RelativeIndependentTTest.
TYPE:
|
Source code in aboba/processing/cuped_processor.py
transform
Apply CUPED transformation to the data.
Fits OLS regression Y = a + b^T X on all rows (no treatment variable), then computes Z = Y - b^T X (subtracts only the slope contribution).
Saves the original control group mean in artifacts for proper relative effect calculation when group_column and group_control are specified.
| PARAMETER | DESCRIPTION |
|---|---|
data
|
DataFrame to transform.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Tuple[DataFrame, Dict[str, Any]]
|
Tuple[pd.DataFrame, Dict[str, Any]]: - Transformed DataFrame with CUPED values - Artifacts dict containing: * 'cuped_original_control_mean': Original mean of control group (if group info provided) * 'cuped_value_column': Name of the original value column |
Source code in aboba/processing/cuped_processor.py
RegressionProcessor
RegressionProcessor
Bases: BaseDataProcessor
Performs regression-based transformation for CUPED or other adjustments.
This processor uses linear regression to adjust target metrics based on covariates. It can be used for CUPED transformation or other regression-based adjustments.
Examples:
import pandas as pd
import numpy as np
from aboba.processing.regression_processor import RegressionProcessor
# Create sample data
np.random.seed(42)
n = 1000
covariate1 = np.random.normal(0, 1, n)
covariate2 = np.random.normal(0, 1, n)
# Target variable with some relationship to covariates
target = 2 * covariate1 + 1.5 * covariate2 + np.random.normal(0, 0.5, n)
# Add group column
group = ['control'] * (n//2) + ['test'] * (n//2)
# Add treatment effect
target[n//2:] += 2
data = pd.DataFrame({
'target': target,
'covariate1': covariate1,
'covariate2': covariate2,
'group': group
})
# Apply regression adjustment
processor = RegressionProcessor(
value_column='target',
covariate_columns=['covariate1', 'covariate2'],
result_column='adjusted_target',
group_column='group',
group_test='test',
group_control='control'
)
processed_data, _ = processor.transform(data)
print(f"Original mean: {data['target'].mean():.2f}")
print(f"Adjusted mean: {processed_data['adjusted_target'].mean():.2f}")
Source code in aboba/processing/regression_processor.py
8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 | |
__init__
__init__(value_column: str, covariate_columns: List[str], result_column: str, group_column: Optional[str] = None, group_test: Any = None, group_control: Any = None)
Initialize the RegressionProcessor.
| PARAMETER | DESCRIPTION |
|---|---|
value_column
|
Name of the column with the target values.
TYPE:
|
covariate_columns
|
List of column names with covariate values.
TYPE:
|
result_column
|
Name of the column to store the adjusted values.
TYPE:
|
group_column
|
Name of the column with group labels. If None, all data is used for regression.
TYPE:
|
group_test
|
Value in group_column that identifies the test group.
TYPE:
|
group_control
|
Value in group_column that identifies the control group.
TYPE:
|
Source code in aboba/processing/regression_processor.py
transform
Apply regression transformation to the data.
| PARAMETER | DESCRIPTION |
|---|---|
data
|
DataFrame to transform.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
|
Tuple[pd.DataFrame, Dict]: Transformed DataFrame and empty artifacts dict. |
Source code in aboba/processing/regression_processor.py
RenameColsProcessor
RenameColsProcessor
Bases: BaseDataProcessor
Renames columns in a DataFrame according to a mapping dictionary.
This processor takes a dictionary that maps old column names to new column names and applies the renaming to the DataFrame.
Examples:
import pandas as pd
from aboba.processing.rename_cols_processor import RenameColsProcessor
# Create sample data
data = pd.DataFrame({
'old_name1': [1, 2, 3],
'old_name2': ['a', 'b', 'c']
})
# Rename columns
processor = RenameColsProcessor({
'old_name1': 'new_name1',
'old_name2': 'new_name2'
})
processed_data, _ = processor.transform(data)
print(processed_data.columns.tolist())
Source code in aboba/processing/rename_cols_processor.py
EnsureColsProcessor
EnsureColsProcessor
Bases: BaseDataProcessor
Verifies that specified columns are present in dataframe.
This processor checks that all required columns are present in the DataFrame and raises an assertion error if any are missing. It's useful for validating data before processing.
Examples:
import pandas as pd
from aboba.processing.ensure_cols_processor import EnsureColsProcessor
# Create sample data
data = pd.DataFrame({
'col1': [1, 2, 3],
'col2': ['a', 'b', 'c']
})
# Verify required columns are present
processor = EnsureColsProcessor(['col1', 'col2'])
processed_data, _ = processor.transform(data) # No error
print("Columns verified successfully")
# This would raise an AssertionError:
# processor = EnsureColsProcessor(['col1', 'missing_col'])
# processor.transform(data)
Source code in aboba/processing/ensure_cols_processor.py
transform
Verify that all required columns are present in the data.
| PARAMETER | DESCRIPTION |
|---|---|
data
|
DataFrame to verify.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
|
Tuple[pd.DataFrame, Dict]: Original DataFrame and empty artifacts dict. |
| RAISES | DESCRIPTION |
|---|---|
AssertionError
|
If any required column is missing from the DataFrame. |
Source code in aboba/processing/ensure_cols_processor.py
PreprocessorPipeline
PreprocessorPipeline
Bases: BaseDataProcessor
Processes data through a sequence of preprocessing steps.
This pipeline applies a series of data processors in order, passing the output of each step as input to the next. It supports both fitting (for processors that need to learn from data) and transforming.
Examples:
import pandas as pd
from aboba.processing.preprocessor_pipeline import PreprocessorPipeline
from aboba.processing.rename_cols_processor import RenameColsProcessor
from aboba.processing.ensure_cols_processor import EnsureColsProcessor
# Create sample data
data = pd.DataFrame({
'old_name': [1, 2, 3, 4, 5],
'other_col': ['a', 'b', 'c', 'd', 'e']
})
# Create a pipeline with two steps
pipeline = PreprocessorPipeline([
RenameColsProcessor({'old_name': 'new_name'}),
EnsureColsProcessor(['new_name', 'other_col'])
], verbose=True)
# Fit and transform the data
processed_data, artifacts = pipeline.fit_transform(data)
print(processed_data.columns.tolist())
Source code in aboba/processing/preprocessor_pipeline.py
11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 | |
fit
This called once on all available data, each processor step receives processed data from the previous step
| PARAMETER | DESCRIPTION |
|---|---|
data
|
full data, with all groups
TYPE:
|
Source code in aboba/processing/preprocessor_pipeline.py
transform
Transforms data in steps. Fit must be called before transforming
| PARAMETER | DESCRIPTION |
|---|---|
data
|
data to fit on and to transform
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Tuple[DataFrame, Dict]
|
Tuple[pd.DataFrame, Dict]: tuple with processed row and artefacts dict |
Source code in aboba/processing/preprocessor_pipeline.py
fit_transform
Combination of fit and transform
| PARAMETER | DESCRIPTION |
|---|---|
data
|
data to fit on and to transform
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Tuple[DataFrame, Dict]
|
Tuple[pd.DataFrame, Dict]: tuple with processed row and artefacts dict |