Developing and registering custom operators
In addition to Built-in Operators, one can develop a custom operator and use it as part of an application. In a nutshell, a custom operator is a Python function that accepts and outputs dataframes, decorated by one of Snorkel Flow operator decorators.
In this example, let’s assume that we want to add an operator that adds a random number after DateSpanExtractor
.
Develop a custom operator
First, get the (output) DataFrame from DateSpanExtractor
:
import snorkelflow.client as sf
DATASET_NAME = "contracts"
APP_NAME = "date_extraction"
# Get the application info
app = sf.get_application(APP_NAME)
# Get the node UID of DateSpanExtractor
node = sf.get_node_uid(APP_NAME, search_op_type="DateSpanExtractor")[0]
# Get the first 10 rows of the output DataFrame from DateSpanExtractor,
# which will become the **input** for the operator you develop
df = sf.get_node_output_data(application=APP_NAME, node=node, max_input_rows=10)
Note that the uid column of your data has been renamed to context_uid, and is now the index of the dataframe.
Next, define your operator. Custom operators take a dataframe as input and must return a dataframe. As an example, we will create an operator that adds a column of random numbers.
import pandas as pd
from snorkelflow.operators import pandas_operator
@pandas_operator(
name="add_rand_num", input_schema={}, output_schema={"rand_num": float},
)
def add_rand_num(df: pd.DataFrame) -> pd.DataFrame:
import random
df["rand_num"] = [random.random() for _ in range(len(df))]
return df
While @pandas_operator
lets you define a Operator
using simple Pandas syntax, Operator
s are executed over Dask DataFrames. To test our operator, we will first need to convert df to a Dask DataFrame. Then we will run our operator over the converted DataFrame and view the first rows. Next, we may want to test our operator against a dataframe in our notebook. We can do this by calling my_function.execute_df(df)
on a Pandas DataFrame.
import dask.dataframe as dd
ddf = dd.from_pandas(df, npartitions=1)
df_processed_in_nb = add_rand_num.execute([ddf]).compute()
print(df_processed_in_nb.head())
operator_test_in_notebook = add_rand_num.execute_df(df)
If we’re happy with the results, we can register our operator with Snorkel Flow:
operator_uid = sf.add_operator(
operator=add_rand_num,
overwrite=False
)["op_uid"]
Choosing the right custom decorator
Snorkel Flow provides several decorators for wrapping custom functions to produce an operator, depending on the operator’s purpose and properties. We describe some of the more common ones below.
- pandas_featurizer: A featurizer is an operator that only adds one or more columns (features) to a DataFrame, but does not add or delete any rows. Use this decorator when creating a featurizer. Most operators you end up creating will use this decorator. Note that when using
pandas_featurizer
, you must specify theinput_schema
(columns required by the operator) andoutput_schema
(new columns created by the operator). - span_normalizer: A normalizer is an operator that converts the
span_text
column into a canonical form, saved in a newnormalized_span
column. Use this decorator if you want to produce a normalizer, by decorating the function that maps a string (span text) to another string (normalized span). - span_reducer: A reducer keeps a subset of rows of the original DataFrame, by e.g. choosing one span to keep per document. Use this decorator to create a reducer by defining the function that maps a Pandas DataFrame to a reduced DataFrame (with a subset of the rows).
- row_filter: A row filter selects certain rows from the original DataFrame to enable conditional processing.
- field_extractor: An extractor operates on a dataframe with one row per document, and produces a dataframe with one row per span. Use this decorator to write a custom extractor by decorating a function that maps the document string to a list of spans (tuples of
char_start
,char_end
, and an optionalspan_entity
). - pandas_operator: Use this decorator when you have a function that maps a pandas DataFrame to another Pandas DataFrame, but is not of any of the above types (featurizer, normalizer, reducer, or extractor).
- page_splitter: (PDF apps only) This operator takes in a row corresponding to a document and defines how to split the doc up into pages.
See snorkelflow.operators for the exhaustive list of available decorators.
Use a custom operator
In Application Studio
Click on the hamburger icon on the DateSpanExtractor
, select Add node after, then select ChangeColumn. Next, click on the added Featurizer node to select add_rand_num that we just added. If you do not see the Add node after option, see Compose complex operator graphs to enable this functionality.
In Notebook
Inserting an operator requires both input_node_uids
and output_node_uid
. You can check them either manually in Application Studio or programmatically in a notebook. The snippet below retrieves those node uids, adds an uncommitted node, finally commits the custom operator to this node.
import snorkelflow.client as sf
# Get the node uid of DateSpanExtractor, which will become the custom operator's input node
input_node = sf.get_node_uid(APP_NAME, search_op_type="DateSpanExtractor")[0]
# Get the node uid of SpanPreviewPreprocessor, which will become the custom operator's output node
output_node = sf.get_node_uid(APP_NAME, search_op_type="SpanPreviewPreprocessor")[0]
# First, add a node
node = sf.add_node(
application=APP_NAME,
input_node_uids=[input_node],
expected_op_type="Featurizer",
output_node_uid=output_node,
)["node_uid"]
# Then, commit the custom operator
sf.commit_custom_operator(
node=node,
operator_uid=operator_uid,
)
You can quickly check if the custom operator behaves as expected in Notebook.
# Get the first 10 rows of the output DataFrame from the added node
df_processed_in_pf = sf.get_node_output_data(application=APP_NAME, node=node, max_input_rows=10)
print(df_processed_in_pf.head())
When a node is added to the DAG, the active data sources at the model node become stale. To check that the model node receives the correct dataframe from the upstream, the model node’s active data sources should be refreshed.
# Get the UID of the model node and refresh its active data sources
model_node = int(sf.get_model_nodes(APP_NAME)[0])
sf.refresh_active_datasources(model_node)
# Get the (input) DataFrame at the model node
df = sf.get_node_data(model_node)
print(df.head())
Developing a custom page splitter
Page splitters are specifically for PDF apps and are useful in making large documents easier to label and process on the platform. A user can define custom logic for splitting up a PDF by using the @page_splitter
decorator. The function takes in a single row of a DataFrame (corresponding to a single document) and outputs a list of page indices in each split. The snippet below separates PDF documents into odd and even pages
from snorkelflow.operators import page_splitter
import pandas as pd
@page_splitter(name="even_odd_pages")
def even_odd_pages(row: pd.Series) -> List[List[int]]:
from snorkelflow.rich_docs.rich_doc import RichDocCols
page_docs = row[RichDocCols.PAGE_DOCS]
even_splits = list(range(0, len(page_docs), 2))
odd_splits = list(range(1, len(page_docs), 2))
return [even_splits, odd_splits]
Developing a custom row filter
A user can define custom logic for filtering data by using the @row_filter
decorator. The function takes in the entire DataFrame and outputs a Pandas Series that is used as the filter. The snippet below filters rows of the DataFrame by looking at a specific regex pattern:
import re
from snorkelflow.operators import row_filter
import pandas as pd
pattern = re.compile(r"(?:hello|good morning)", flags=re.IGNORECASE)
@row_filter(
name="greetings_filter",
resources=dict(compiled_rgx=pattern),
)
def greetings_filter(df: pd.DataFrame, compiled_rgx: re.Pattern) -> pd.Series:
text_col = df["text"]
return text_col.apply(lambda x: bool(compiled_rgx.match(x)))
Advanced options
Including other functions and objects as resources
You may want to define an external helper function or rely on a previously constructed object within your operator. This is supported by bundling these objects into the operator before registering with Snorkel Flow. This can be done within the Operator function’s scope, or by passing it in via the resources
key (and adding a corresponding keyword argument to the function signature).
For instance, you may want to reference a list of user ids within a PandasOperator
:
from snorkelflow.operators import pandas_operator
@pandas_operator(name="Is allowlisted", input_schema={}, resources=dict(allowlist=[5, 20, 100]))
def is_allowlisted(df: pd.DataFrame, allowlist: List[int]) -> pd.DataFrame:
df["is_allowlisted"] = [user_id in allowlist for user_id in df["user_id"]]
return df
When using resources that are large (e.g. a spacy model), use the resources_fn
key to pass in a function that generates the resources instead of passing the resources themselves.
from snorkelflow.operators import pandas_featurizer
def spacy_fn() -> Dict:
import spacy
nlp = spacy.load('en_core_web_sm')
return {'nlp': nlp}
@pandas_featurizer(name="Tokenize", input_schema={'text': str}, output_schema={'tokens': str}, resources_fn=spacy_fn)
def tokenize(df: pd.DataFrame, nlp: Any) -> pd.DataFrame:
df["tokens"] = df.text.apply(lambda x: nlp.tokenizer(x).to_json())
return df
NOTE
Using custom or complex Python objects as resources is in beta and may lead to incompatibilities when executing the operator via the web UI.
Using other libraries in operators
The recommended way to use additional libraries is to import them into the function body.
import pandas as pd
from snorkelflow.operators import pandas_operator
@pandas_operator(
name="add_random_num", input_schema={}, output_schema={"rand_num": float},
)
def add_random_num(df: pd.DataFrame) -> pd.DataFrame:
import random # import here!
df["rand_num"] = [random.random() for _ in range(len(df))]
return df
Specifying input/output schemas for additional validation
For the @pandas_operator
and @dask_operator
decorators, an input_schema
and/or output_schema
can be specified to explicitly state the expected input/output types of the operator. Custom operators take a dataframe as input and must return a dataframe.
In this example, the input DataFrame is expected to have a column num_images
with dtype int
, and this operator will output a new DataFrame with a special_number
column with dtype float
.
from snorkelflow.operators import pandas_operator
@pandas_operator(
name="special number generator",
input_schema={"num_images": int},
output_schema={"special_number": float},
)
def new_columns(df: pd.DataFrame) -> pd.DataFrame:
df["special_number"] = df["num_images"].fillna(1) * 1.5
return df
Documentation on additional advanced options can be found in snorkelflow.operators.
Investigating issues
If you encounter any issues and want to understand the details, you could enable the debug mode of the Snorkel Flow SDK. For example, if you want to investigate issues when using the operator, you can do the following:
ctx.set_debug(True)
df_processed_in_pf=sf.get_node_output_data(application=APP_NAME, node=node, max_input_rows=10)
It will show the detailed error message and stacktrace which can help you fix the custom operator.