Skip to main content
Version: 0.94

Developing and registering custom operator classes

Generally, parameters for custom operators are hard-coded within the user-defined function. This is used for simple and quick transformations. To create custom operators that have parameters specified in the Application Studio overview, we instead create custom operator classes.

As an example, let’s assume that we want to add an operator that scales a numeric column. That is, we want the mean of the column to be 0, and the standard deviation of the column to be 1. The Scaler customer operator we define should do so by subtracting a specified mean and dividing by a specified standard deviation. This operator takes a number of arguments that should be configurable in the Application Studio overview, such as field (the name of column to scale), target_field (the new column name), mean, and std. To enable these configurable operators, we define an operator class instead.

Defining the class

To develop an operator class, we write a class in Notebook that inherits from the Featurizer class. We need to define several necessary methods as part of this new operator class: __init__, input_schema, output_schema, _compute_features, and _fit (optional).

Let’s take a look at each of these methods individually as they are added to the Notebook.

__init__ defines the arguments that will be configured as part of our newly created operator, which will be called CustomStandardScaler. These values will be customizable in the Application Studio overview page. As such, we place field, target_field, mean, std as the parameters of __init__ and initialize these as instance variables.

from typing import Any, Dict, Optional

from snorkelflow.operators.featurizer import Featurizer
from snorkelflow.operators.operator import ColSchema

class CustomStandardScaler(Featurizer):
"""Preprocessor that scales a numerical column to mean=0 and std=1."""

def __init__(
self,
field: str,
target_field: Optional[str] = None,
mean: float = 0.0,
std: float = 1.0,
) -> None:
self.field = field
self.target_field = target_field or f"{field}_scaled"
if std is not None and std < 0:
raise ValueError("std must be >= 0")
self.mean = mean
self.std = std

Custom operator classes enable us to define an optional _fit method for trainable operators. Fit functionality is very useful for operators with parameters that depend on the dataset itself. In this example, mean and std are configurable parameters that need to be calculated on the field column.

The class method _fit is generally used to automatically fit the operator arguments based on training data. This method takes in the dataframe df and additional parameters for the fit, in this case field and target_field. _fit should return a dictionary of kwargs matching those specified in __init__. In this example, the necessary mean and std are calculated using the numpy library and added as arguments along with field and target_field.

class CustomStandardScaler(Featurizer):
...
@classmethod
def _fit(cls, df: pd.DataFrame, field: str, target_field: str) -> Dict[str, Any]: # type: ignore
import numpy as np
kwargs = {
"field": field,
"target_field": target_field,
"mean": np.mean(df[field]),
"std": np.std(df[field]),
}
return kwargs

input_schema and output_schema are properties that define the input and output dataframes format of this operator. Specifically, CustomStandardScaler will operate on a numerical(float) column with the passed-in name from self.field. Additionally the output column will be numerically(float) valued and have the passed in name from self.target_field.

class CustomStandardScaler(Featurizer):
field: str
target_field: str

@property
def input_schema(self) -> ColSchema:
return {self.field: float}

@property
def output_schema(self) -> ColSchema:
return {self.target_field: float}

_compute_features will contain the actual scaling logic and dictates how the transformation is calculated on the input dataframe input_df. As mentioned before, the scaling logic will simply subtract the mean from the column and divide it by the std. Finally, we return the resulting dataframe.

class CustomStandardScaler(Featurizer):
field: str
target_field: str
std: float
mean: float

def compute_features(self, input_df: pd.DataFrame, callback: Optional[OpProgressCallback] = None) -> pd.DataFrame:
"""Override base class method"""
input_df[self.target_field] = (input_df[self.field] - self.mean) / self.std
return input_df

Here is the full code block which defines our CustomStandardScaler operator class. Note that custom operators compute on dataframes as inputs and outputs.

from typing import Any, Dict, Optional

from snorkelflow.operators.featurizer import Featurizer, OpProgressCallback
from snorkelflow.operators.operator import ColSchema


class CustomStandardScaler(Featurizer):
"""Preprocessor that scales a numerical column to mean=0 and std=1."""

def __init__(
self,
field: str,
target_field: Optional[str] = None,
mean: float = 0.0,
std: float = 1.0,
) -> None:
self.field = field
self.target_field = target_field or f"{field}_scaled"
if std is not None and std < 0:
raise ValueError("std must be >= 0")
self.mean = mean
self.std = std

@classmethod
def _fit(cls, df: pd.DataFrame, field: str, target_field: str) -> Dict[str, Any]: # type: ignore
import numpy as np
kwargs = {
"field": field,
"target_field": target_field,
"mean": np.mean(df[field]),
"std": np.std(df[field]),
}
return kwargs

@property
def input_schema(self) -> ColSchema:
return {self.field: float}

@property
def output_schema(self) -> ColSchema:
return {self.target_field: float}

def _compute_features(self, input_df: pd.DataFrame, callback: Optional[OpProgressCallback] = None) -> pd.DataFrame:
"""Override base class method"""
input_df[self.target_field] = (input_df[self.field] - self.mean) / self.std
return input_df

Registering the class

After defining the operator class, we are now ready to register this class for use in Snorkel Flow.

We do so by calling the add_operator_class SDK function and pass in our class definition CustomStandardScaler.

sf.add_operator_class(
CustomStandardScaler, overwrite=True
)

Use a custom operator class

In the Application Studio overview, click on the hamburger icon on the Model node, select Add node before, then select ChangeColumn. Next, click on the added Featurizer node to select CustomStandardScaler that we just added. Since we defined a class, the custom operator now acts as a built-in operator with fully configurable parameters that were defined in our __init__. Simply enter the necessary parameters and commit the custom operator like any other built-in operator.

How to add a node in DAG

Fit an operator in Snorkel Flow

Once we have a trainable operator with fit functionality. We can revisit the Application Studio overview and examine the operator config of our newly created operator CustomStandardScaler.

For operators with _fit defined, a Fit Values button will appear on the bottom left. Clicking on that will lead to a separate modal that takes in the necessary arguments. In this example, if we wanted to scale the Income and place it in a new column Income_scaled, we simply select those fields and click Fit.

How to fit an operator in DAG

After the operator has finished fitting to the data, the previous operator config modal will be automatically populated with the result. The operator can then be committed just like any other operator.

The result of fitting an operator in DAG

Investigating issues

If you encounter any issues and want to understand the details, you could enable the debug mode of the Snorkel Flow SDK. For example, if you want to investigate issues when registering the operator class, you can do:

ctx.set_debug(True)
sf.add_operator_class(CustomStandardScaler, overwrite=True)

It will show the detailed error message and stacktrace which can help you fix the custom operator class.