Skip to main content
Version: 0.96

Example notebooks

The in-platform Notebook server, accessible via the Notebook button in the sidebar of the web UI, has a number of example notebooks under the SampleNotebooks directory in the File Browser.

Create a new dataset and add datasources

The following snippet is an example workflow using snorkelflow.client to create a new dataset and add datasources for different splits.

import os
import snorkelflow.client as sf

# Configure client context for Snorkel Flow instance
ctx = sf.SnorkelFlowContext.from_kwargs()

# Create a dataset called ``contracts``
DATASET_NAME = "contracts"
sf.create_dataset(DATASET_NAME)

# Add a datasource for the train, valid, and test splits which have been uploaded to
# MinIO in the contracts_data bucket as Parquet files with UID columns called uid
sf.create_datasource(
dataset=DATASET_NAME,
path="s3://snorkel-contracts-dataset/train.parquet",
file_type="parquet",
uid_col="uid",
split="train",
)
sf.create_datasource(
dataset=DATASET_NAME,
path="s3://snorkel-contracts-dataset/dev.parquet",
file_type="parquet",
uid_col="uid",
split="train",
)
sf.create_datasource(
dataset=DATASET_NAME,
path="s3://snorkel-contracts-dataset/valid.parquet",
file_type="parquet",
uid_col="uid",
split="valid",
)
sf.create_datasource(
dataset=DATASET_NAME,
path="s3://snorkel-contracts-dataset/test.parquet",
file_type="parquet",
uid_col="uid",
split="test",
)

Create an application

Now that we have a dataset with datasources, let’s create an application. Here we create a text extraction application to extract dates from the “text” field of the data.

import snorkelflow.client as sf
APP_NAME = "date_extraction"

# Create a text extraction application.
# This will create an application with a block of operators suited for text extraction.
app = sf.create_text_extraction_application(
application_name=APP_NAME,
dataset=DATASET_NAME,
input_schema=["url", "text"],
extraction_field="text", # specify the field from which spans will be extracted.
labels=["label1", "label2"],
)

# Get the UID of the node "SpanExtractor" in the DAG.
extractor_node = sf.get_node_uid(APP_NAME, search_op_type="SpanExtractor")[0]

# Commit "DateSpanExtractor" to the node
sf.commit_builtin_operator(
node=extractor_node,
op_type="DateSpanExtractor",
op_config={"field": "text"}
)

# Get the UID of the node "SpanReducer" in the DAG.
reducer_node = sf.get_node_uid(APP_NAME, search_op_type="SpanReducer")[0]

# Commit "IdentityReducer" to the node
sf.commit_builtin_operator(
node=reducer_node,
op_type="IdentityReducer",
op_config={}
)

# Optionally, add "DateSpanNormalizer" after "DateSpanExtractor" node.
preview_node = sf.get_node_uid(APP_NAME, search_op_type="SpanPreviewPreprocessor")[0]

sf.add_node(
application=APP_NAME,
input_node_uids=[extractor_node],
output_node_uid=preview_node,
expected_op_type="Featurizer",
op_type="DateSpanNormalizer",
)

Alternatively, you can create the same application using lower-level methods.

import snorkelflow.client as sf
# Create an application without any block.
sf.create_application(
application_name=APP_NAME,
dataset=DATASET_NAME,
input_schema=["url", "text"],
)

# Add a block of operators using the application template "te" (Text Extraction).
from snorkelflow.types.workflows import INPUT_NODE_ID
sf.add_block_to_application(
application=APP_NAME,
template_id="te",
input_node=INPUT_NODE_ID,
block_config=dict(
label_map=dict(UNKNOWN=-1, NEGATIVE=0, POSITIVE=1),
extraction_field="text", # specify the field from which spans will be extracted.
),
)

# Get the application info
app = sf.get_application(APP_NAME)

# Run the same commands as above to commit "DateSpanExtractor" and "IdentityReducer"

Train a custom model using a Snorkel Flow training dataset

The following snippet is an example workflow to integrate a custom machine learning pipeline with the Snorkel Flow platform. Using the application in Quick Start: End-to-End Document Classification as an example, we will first get a training dataset created in Snorkel Flow, then train an ML model, and finally upload the predictions to Snorkel Flow in order to use the integrated analysis tools. Here, we use LogisticRegression and CountVectorizer from scikit-learn as an example, but you can use any model and featurizer of your choice.

from sklearn.linear_model import LogisticRegression
from sklearn.feature_extraction.text import CountVectorizer

APP_NAME = "YOUR_APP_NAME" # Replace with your application name

feature_column = "text"
training_set_uid = 1
application_uid = sf.get_application_uid(name=APP_NAME)
node = sf.get_model_node(application_uid)

# Get the dataframe of each split and set aside the training set labels of the train split
df_train = sf.get_node_data(
node=node,
split="train",
training_set_labels=True,
training_set_uid=training_set_uid,
training_set_filter_unlabeled=True,
)
df_dev = sf.get_node_data(node=node, split="dev")
df_valid = sf.get_node_data(node=node, split="valid")
df_test = sf.get_node_data(node=node, split="test")
y_train = df_train.pop("training_set_labels")

# Train a featurizer on the feature column
vectorizer = CountVectorizer()
X_train = vectorizer.fit_transform(df_train[feature_column])

# Train a custom model on the training dataset
model = LogisticRegression().fit(X_train, y_train)

# Register the custom model with Snorkel Flow
# training_set argument is the training_set_uid as defined above
model_uid = sf.register_model(
node=node, description="My custom model",
training_set=training_set_uid
)

# For each split, get a feature matrix, get predictions, and add them to Snorkel Flow
for df in (df_dev, df_valid, df_test):
X = vectorizer.transform(df[feature_column])
preds = model.predict(X)
x_uids = df.index.tolist()
sf.add_predictions(
node=node,
x_uids=x_uids,
predicted_labels=preds,
model_uid=model_uid,
)

Load data with labels/annotations

In the above example, we have just seen that data can be loaded with training labels. Here, we show how one might load other metadata (i.e., ground truth labels, training labels, predictions, and/or tags) into the dataframe.

import snorkelflow.client as sf

# Load data with training labels
df_train = sf.get_node_data(
node=node,
split="train",
training_set_labels=True,
training_set_uid=1,
training_set_sampler_config={"strategy": "auto"},
)

# Load data
df_dev = sf.get_node_data(
node=node,
split="dev",
tags=True,
)

# Load data with ground truth labels and predictions
df_test = sf.get_node_data(
node=node,
split="test",
ground_truth=True, # True by default but explicitly set for illustration purposes.
model_uid=model_uid,
model_predictions=True,
)

Generating labeling functions in the notebook

Labeling functions are defined using the @labeling_function decorator, which wraps a Python function that takes in a Pandas Series object representing an individual data point as input and returns a string representing the label.

The following example shows a labeling function looking for the substring "xyz" in a field of the data. It returns the label "LABEL" if it finds the substring, and otherwise returns the special label "UNKNOWN".

from snorkel.labeling.lf import labeling_function

@labeling_function(name="my_lf")
def lf(x):
if "xyz" in x.field_name:
return "LABEL"
return "UNKNOWN"

# Save to an application
sf.add_code_lf(node, lf, label="LABEL") # Default applied to split: "dev"

In a real LF, the label string (here, "LABEL") needs to be replaced with the string of the label configured for the application.

note

Once you’ve saved your LF, you will immediately see returned statistics from applying it to the dev set. If you click the Update Statistics button on the left pane of the Label page, you can also see the LF in the LF Summary pane.

Use variables or imports from outside the LF’s scope

Labeling functions can also rely on external resources, like functions or objects defined outside of the labeling function as well as external libraries. Functions and objects can be passed to the labeling function using the resources variable, and add a matching keyword argument to the function signature (find_word_index in the example below). The recommended way to use additional libraries is to import them in the function body.

from snorkel.labeling.lf import labeling_function

def find_word_index(text):
import numpy
try:
idx = numpy.where(text.split(" ").index("employee"))
except:
idx = numpy.array([])
return idx

@labeling_function(name="my_lf", resources=dict(find_word_index=find_word_index))
def lf(x, find_word_index):
import numpy
idx = find_word_index(x.text)
if numpy.mean(idx) <= 1000:
return "employment"
else:
return "UNKNOWN"

sf.add_code_lf(node, lf, label="employment")
note

While resources can be a nested dictionary, functions should be its direct children otherwise they would break when Snorkel Flow upgrades to a newer version of Python. See @labeling_function for more details.

Installing additional NLTK packages for use in an LF or operator

Snorkel Flow comes with popular and vader_lexicon NLTK packages pre-installed. If additional NLTK packages are needed, you can install into the CUSTOM_NLTK_PATH directory. After installation, all code LFs can make use of them. We show a simple example below.

from snorkelflow.utils.file import CUSTOM_NLTK_PATH
from snorkelflow.studio import resources_fn_labeling_function

import nltk
nltk.download("punkt", download_dir=CUSTOM_NLTK_PATH)
nltk.download("averaged_perceptron_tagger", download_dir=CUSTOM_NLTK_PATH)

def get_nltk_tokenizer_parser() -> Dict:
import nltk
return {"tokenize": nltk.word_tokenize, "parse": nltk.pos_tag}

@resources_fn_labeling_function(
name="lf_1", resources_fn=get_nltk_tokenizer_parser,
)
def lf_1(x: pd.Series, tokenize: Callable, parse: Callable) -> str:
pos_pairs = parse(tokenize(x.span_preview))
for token, pos in pos_pairs:
if pos == "NNP":
return "employment"
return "UNKNOWN"

sf.add_code_lf(node, lf_1, label="employment")

Sampler configs for data loading and model training

Snorkel Flow offers a variety of options for data sampling when loading datasets or training models. This is particularly useful when loading large or unbalanced datasets. Sampling options are passed via a sampler configuration in the Python SDK for functions like get_node_data and train_model. The sampler configuration is a dictionary with the following three keys: strategy (required), params (optional), and class_counts (optional). The Options for params and class_counts vary by sampling strategy.

none

No sampling is performed.

Example

sampler_config = {"strategy": "none"}

fixed

Sample a fixed fraction of each class.

Params

  • p: fraction of each class to sample (must be > 0 and <= 1)

Example

sampler_config = {"strategy": "fixed", "params": {"p": 0.5}}

class_count

Sample the provided number for each class. Classes are upsampled or downsampled to matche the target counts if needed.

Class counts: the number of samples per class

Example

sampler_config = {"strategy": "class_count", "class_counts": [100, 200, 100]}

target_proportion

Attempt to match the provided class proportions. Upsample classes to match the target proportions if needed.

Class counts: unnormalized proportions for each class

Params

  • max_sample_multiple: the sampled dataset will be no more than max_sample_multiple -times the size of the original dataset

Example

sampler_config = {
"strategy": "target_proportion",
"params": {"max_sample_multiple": 5},
"class_counts": [0.333, 0.333, 0.333],
}

auto

Attempt to match the ground truth class proportions of the valid set. Upsample classes to match the target proportions if needed.

Params

  • max_sample_multiple: the sampled dataset will be no more than max_sample_multiple-times the size of the original dataset
  • min_total: minimum total number of valid set labels to perform sampling; if not met, no sampling occurs
  • `min_per_class``: minimum number of valid set labels for each class to perform sampling; if not met, no sampling occurs

Example

sampler_config = {
"strategy": "auto",
"params": {"max_sample_multiple": 5, "min_total": 100},
}