Skip to main content
Version: 0.95

Using custom data points

Snorkel Flow supports the usage of custom data points (such as paragraphs, bounding box, time range, etc.).

If you want to change the data point used to identify each row without changing the dataframe itself, you can use the ChangeDatapoint operator. It lets you specify the desired data point type and columns, but it won’t work if the set of desired data point columns doesn’t lead to a unique data point per row.

For that, you’ll have to write a custom operator (Developing and registering custom operator classes) whose output dataframe will be a data point with the desired type and columns.

As an example, we’ll use custom operators to

  1. split all the documents into paragraphs,
  2. and reduce the separate paragraph predictions from our ML model into one document prediction.

Data point format

In Snorkel, data point UIDs have the following format:

<datapoint_type>::<comma-separated list of values from the columns selected>

As an example, here’s an example dataframe with the data point UID if the type is page and the columns selected are doc_uid, page_uid.

data point UID/indexdoc_uidpage_uidtext
page::12,2122random text
page::15,4154also random

Adding a data source

In this example, we’ll use the contract classification dataset. Follow the instructions under Upload Data in Document Classification and Classifying Contract Types.

Creating an application

Create a new application and select as type “Classification”.

  1. Choose the dataset you created in the previous step.
  2. Give the application name.
  3. “Select all” for the input schema.
  4. Choose “label” for the ground truth column.
  5. Click “Autofill Labels” to get the unique values from the label column.
  6. “Save” the application.

You should now see the Directed Acyclic Graph (DAG) of a basic classification application.

Writing a custom operator class

For more background on how to write custom operators, take a look at Developing and registering custom operator classes and in particular the Using custom data points section. To add the custom operators, open the in-platform Notebook and execute the following code block. It adds a custom paragraph splitter, where a paragraph is defined as text separated by two new lines.

from typing import List, Type
import dask.dataframe as dd
from snorkelflow.operators import Operator
from snorkelflow.operators.operator import ColSchema
from snorkelflow.utils.datapoint import DatapointType

class CustomParagraphSplitter(Operator):

def __init__(
self,
field: str,
paragraph_idx_field: str = "paragraph_idx",
) -> None:
self.field = field
self.paragraph_idx_field = paragraph_idx_field
self.paragraph_text_field = f'{field}_paragraph'
self.new_datapoint_cols = [self.paragraph_idx_field]

@property
def input_schema(self) -> ColSchema:
return {self.field: str, 'context_uid': int}

@property
def output_schema(self) -> ColSchema:
return {self.paragraph_idx_field: int, self.paragraph_text_field: str}

def get_datapoint_type(
self, input_datapoint_types: List[Type[DatapointType]]
) -> Type[DatapointType]:
from snorkelflow.utils.datapoint import Datapoint
"""Get datapoint_type for output DataFrame of the operator, given types for inputs."""
return Datapoint

def _execute(self, input_ddfs: List[dd.DataFrame]) -> dd.DataFrame:
from snorkelflow.utils.datapoint import Datapoint

def map_fn(input_df: pd.DataFrame) -> pd.DataFrame:
from snorkelflow.types.load import DATAPOINT_UID_COL
import pandas as pd

def apply_fn(row: pd.Series) -> pd.Series:
paragraphs = row[self.field].split('\n\n')
row[self.paragraph_idx_field] = list(range(len(paragraphs)))
row[self.paragraph_text_field] = paragraphs
return row

df = input_df.apply(apply_fn, 1)
df_paragraphs = df[[self.paragraph_idx_field, self.paragraph_text_field]].apply(pd.Series.explode).dropna()
df_paragraphs = df_paragraphs.astype({self.paragraph_idx_field: int})
df = df_paragraphs.join(input_df)

# Set the new index and sort -- note that we're doing this on the pandas level
datapoint_instance = Datapoint(["context_uid", self.paragraph_idx_field])
df[DATAPOINT_UID_COL] = datapoint_instance.get_datapoint_uid_col_pandas(df)
return df.set_index(DATAPOINT_UID_COL).sort_index()

return self._execute_avoiding_set_index(input_ddfs[0], map_fn, Datapoint)

Note that in _execute, we call _execute_avoiding_set_index which takes care of passing the correct meta to dask and setting the divisions. We’ll see an example of doing it manually below when we write the reducer. _execute takes a dataframe as input and outputs a dataframe.

The map_fn sets the index and sorts on each dask dataframe partition. Because the input dataframe is sorted and we’re adding to the index, sorting within a partition guarantees that the resulting dataframe is also sorted. This allows us to avoid setting the index on dask, which is computationally expensive as it triggers a reshuffling of the data.

Now that we’ve defined the operator, we need to add it to the DAG.

import snorkelflow.client as sf

DATASET_NAME = ""
APP_NAME = ""

# register the operator
sf.add_operator_class(
CustomParagraphSplitter, overwrite=True
)

# add a node with the source as parent, and commit the CustomParagraphSplitter to it
ps_node_uid = sf.add_node(APP_NAME, [-1], None, add_to_parent_block=True)['node_uid']
sf.commit_builtin_operator(
ps_node_uid, op_type='CustomParagraphSplitter', op_config={'field': 'text'}
)

Adding a model node

Next, add a model node that will take the output of the CustomParagraphSplitter and predict on it. This can be done by executing:

# add a model node after the CustomParagraphSplitter
model_node_uid = sf.add_block_to_application(
application=APP_NAME,
template_id="clf",
input_node=ps_node_uid,
block_config=dict(label_map={'POS': 1, 'NEG': 0, 'UNKNOWN': -1}, label_col=None),
)['node_uids'][0]

Writing a custom reducer

We’ll now add a reducer that will output a ground truth for a document based on the ground truths for the paragraphs. In this example, we’ll use the most common paragraph prediction in the document as that document’s prediction.

Similarly, as before, we first define the custom reducer:

from snorkelflow.operators import Operator
from snorkelflow.operators.operator import ColSchema

class CustomParagraphReducer(Operator):
def __init__(
self,
field: str,
paragraph_idx_field: str = "paragraph_idx",
) -> None:
self.paragraph_idx_field = paragraph_idx_field
self.paragraph_text_field = f'{field}_paragraph'

@property
def input_schema(self) -> ColSchema:
return {self.paragraph_idx_field: int, self.paragraph_text_field: str, 'preds': int}

@property
def output_schema(self) -> ColSchema:
return {}

@property
def drop_schema(self) -> Optional[List[str]]:
return [self.paragraph_idx_field, self.paragraph_text_field]

def get_datapoint_instance(
self, input_datapoint_instances: List[DatapointType]
) -> DatapointType:
from snorkelflow.utils.datapoint import DocDatapoint
return DocDatapoint() # by default DocDatapoint columns are ["context_uid"]

def _execute(self, input_ddfs: List[dd.DataFrame]) -> dd.DataFrame:
from snorkelflow.utils.datapoint import Datapoint
from snorkelflow.types.load import DATAPOINT_UID_COL
from snorkelflow.utils.datapoint import DocDatapoint

def map_fn(input_df: pd.DataFrame) -> pd.DataFrame:
import pandas as pd

# Calculate the doc level prediction as the paragraph prediction that appears most often
df_scores = input_df[['context_uid', 'preds']].groupby(['context_uid']).agg(lambda l: pd.Series.mode(l).iloc[0]).reset_index()
df_rest = input_df[[col for col in input_df.columns if col not in [self.paragraph_idx_field, self.paragraph_text_field, 'preds']]]
df_rest = df_rest.groupby(['context_uid']).agg(lambda x: x.iloc[0])
df = df_rest.merge(df_scores, on='context_uid')

# Set the new index and sort
datapoint_instance = DocDatapoint()
df[DATAPOINT_UID_COL] = datapoint_instance.get_datapoint_uid_col_pandas(df)
return df.set_index(DATAPOINT_UID_COL).sort_index()

ddf = input_ddfs[0]
# The meta has to contain the rows in same order as the result of map_partitions
meta: Dict[str, type] = {'context_uid': int}
meta.update(**{k:v for k,v in ddf.dtypes.to_dict().items() if k not in [self.paragraph_idx_field, self.paragraph_text_field, 'preds']})
meta['preds'] = object
ddf = ddf.map_partitions(map_fn, meta=meta)
# Set the divisions (allows us to avoid setting index on dask level)
ddf.divisions = DocDatapoint.get_new_divisions(ddf.divisions, 1)
return ddf

Few things to note:

  • Only dropping data point columns from the end is permitted, as the resulting dataframe will still be sorted in that case. In this case, the input dataframe contains ["context_uid", "paragraph_idx"] as data point columns, and we dropped paragraph_idx. We couldn’t drop context_uid.
  • In _execute when calling map_partitions on the dask dataframe, we have to pass in the meta of the output dataframe. The order of the columns is important and has to match the order of columns as the result of applying map_partitions.
  • If dropping data point columns, ensure it doesn’t result in duplicate values in the index. Here, the groupby guarantees that.

Then we add it to the DAG after the model node:

# register the reducer
sf.add_operator_class(
CustomParagraphReducer, overwrite=True
)

# add a node after the model one and commit the CustomParagraphReducer to it
pr_node_uid = sf.add_node(APP_NAME, [model_node_uid], None, add_to_parent_block=True)['node_uid']
sf.commit_builtin_operator(
pr_node_uid, op_type='CustomParagraphReducer', op_config={'field': 'text'}
)

Iteratively develop ML model

Custom Operators in DAG

If you go back to the DAG now, you will see the new custom operators that we added:

Click on the model node that comes after the CustomParagraphSplitter to go to the Label page. From here, you can follow the standard process of adding labeling functions (LFs), programmatically labelling the dataset, and training a model. If you’re unsure how to do that, you can refer to those steps in the Document Classification and Classifying Contract Types tutorial.