Developing and registering custom operator classes
Generally, parameters for custom operators are hard-coded within the user-defined function. This is used for simple and quick transformations. To create custom operators that have parameters specified in the Application Studio overview, we instead create custom operator classes.
As an example, let’s assume that we want to add an operator that scales a numeric column. That is, we want the mean of the column to be 0, and the standard deviation of the column to be 1. The Scaler customer operator we define should do so by subtracting a specified mean and dividing by a specified standard deviation. This operator takes a number of arguments that should be configurable in the Application Studio overview, such as field (the name of column to scale), target_field (the new column name), mean, and std. To enable these configurable operators, we define an operator class instead.
Defining the class
To develop an operator class, we write a class in Notebook that inherits from the Featurizer
class. We need to define several necessary methods as part of this new operator class: __init__
, input_schema
, output_schema
, _compute_features
, and _fit
(optional).
Let’s take a look at each of these methods individually as they are added to the Notebook.
__init__
defines the arguments that will be configured as part of our newly created operator, which will be called CustomStandardScaler
. These values will be customizable in the Application Studio overview page. As such, we place field
, target_field
, mean
, std
as the parameters of __init__
and initialize these as instance variables.
from typing import Any, Dict, Optional
from snorkelflow.operators.featurizer import Featurizer
from snorkelflow.operators.operator import ColSchema
class CustomStandardScaler(Featurizer):
"""Preprocessor that scales a numerical column to mean=0 and std=1."""
def __init__(
self,
field: str,
target_field: Optional[str] = None,
mean: float = 0.0,
std: float = 1.0,
) -> None:
self.field = field
self.target_field = target_field or f"{field}_scaled"
if std is not None and std < 0:
raise ValueError("std must be >= 0")
self.mean = mean
self.std = std
Custom operator classes enable us to define an optional _fit
method for trainable operators. Fit functionality is very useful for operators with parameters that depend on the dataset itself. In this example, mean and std are configurable parameters that need to be calculated on the field column.
The class method _fit
is generally used to automatically fit the operator arguments based on training data. This method takes in the dataframe df
and additional parameters for the fit, in this case field
and target_field
. _fit
should return a dictionary of kwargs matching those specified in __init__
. In this example, the necessary mean
and std
are calculated using the numpy library and added as arguments along with field
and target_field
.
class CustomStandardScaler(Featurizer):
...
@classmethod
def _fit(cls, df: pd.DataFrame, field: str, target_field: str) -> Dict[str, Any]: # type: ignore
import numpy as np
kwargs = {
"field": field,
"target_field": target_field,
"mean": np.mean(df[field]),
"std": np.std(df[field]),
}
return kwargs
input_schema
and output_schema
are properties that define the input and output dataframes format of this operator. Specifically, CustomStandardScaler
will operate on a numerical(float) column with the passed-in name from self.field
. Additionally the output column will be numerically(float) valued and have the passed in name from self.target_field
.
class CustomStandardScaler(Featurizer):
field: str
target_field: str
@property
def input_schema(self) -> ColSchema:
return {self.field: float}
@property
def output_schema(self) -> ColSchema:
return {self.target_field: float}
_compute_features
will contain the actual scaling logic and dictates how the transformation is calculated on the input dataframe input_df
. As mentioned before, the scaling logic will simply subtract the mean from the column and divide it by the std. Finally, we return the resulting dataframe.
class CustomStandardScaler(Featurizer):
field: str
target_field: str
std: float
mean: float
def compute_features(self, input_df: pd.DataFrame, callback: Optional[OpProgressCallback] = None) -> pd.DataFrame:
"""Override base class method"""
input_df[self.target_field] = (input_df[self.field] - self.mean) / self.std
return input_df
Here is the full code block which defines our CustomStandardScaler
operator class. Note that custom operators compute on dataframes as inputs and outputs.
from typing import Any, Dict, Optional
from snorkelflow.operators.featurizer import Featurizer, OpProgressCallback
from snorkelflow.operators.operator import ColSchema
class CustomStandardScaler(Featurizer):
"""Preprocessor that scales a numerical column to mean=0 and std=1."""
def __init__(
self,
field: str,
target_field: Optional[str] = None,
mean: float = 0.0,
std: float = 1.0,
) -> None:
self.field = field
self.target_field = target_field or f"{field}_scaled"
if std is not None and std < 0:
raise ValueError("std must be >= 0")
self.mean = mean
self.std = std
@classmethod
def _fit(cls, df: pd.DataFrame, field: str, target_field: str) -> Dict[str, Any]: # type: ignore
import numpy as np
kwargs = {
"field": field,
"target_field": target_field,
"mean": np.mean(df[field]),
"std": np.std(df[field]),
}
return kwargs
@property
def input_schema(self) -> ColSchema:
return {self.field: float}
@property
def output_schema(self) -> ColSchema:
return {self.target_field: float}
def _compute_features(self, input_df: pd.DataFrame, callback: Optional[OpProgressCallback] = None) -> pd.DataFrame:
"""Override base class method"""
input_df[self.target_field] = (input_df[self.field] - self.mean) / self.std
return input_df
Registering the class
After defining the operator class, we are now ready to register this class for use in Snorkel Flow.
We do so by calling the add_operator_class
SDK function and pass in our class definition CustomStandardScaler
.
sf.add_operator_class(
CustomStandardScaler, overwrite=True
)
Use a custom operator class
In the Application Studio overview, click on the hamburger icon on the Model
node, select Add node before, then select ChangeColumn. Next, click on the added Featurizer node to select CustomStandardScaler that we just added. Since we defined a class, the custom operator now acts as a built-in operator with fully configurable parameters that were defined in our __init__
. Simply enter the necessary parameters and commit the custom operator like any other built-in operator.
Fit an operator in Snorkel Flow
Once we have a trainable operator with fit functionality. We can revisit the Application Studio overview and examine the operator config of our newly created operator CustomStandardScaler
.
For operators with _fit
defined, a Fit Values
button will appear on the bottom left. Clicking on that will lead to a separate modal that takes in the necessary arguments. In this example, if we wanted to scale the Income
and place it in a new column Income_scaled
, we simply select those fields and click Fit
.
After the operator has finished fitting to the data, the previous operator config modal will be automatically populated with the result. The operator can then be committed just like any other operator.
Investigating issues
If you encounter any issues and want to understand the details, you could enable the debug mode of the Snorkel Flow SDK. For example, if you want to investigate issues when registering the operator class, you can do:
ctx.set_debug(True)
sf.add_operator_class(CustomStandardScaler, overwrite=True)
It will show the detailed error message and stacktrace which can help you fix the custom operator class.