Example notebooks
The in-platform Notebook server, accessible via the Notebook button in the sidebar of the web UI, has a number of example notebooks under the SampleNotebooks directory in the File Browser.
Create a new dataset and add datasources
The following snippet is an example workflow using snorkelflow.client
to create a new
dataset and add datasources for different splits.
import os
import snorkelflow.client as sf
# Configure client context for Snorkel Flow instance
ctx = sf.SnorkelFlowContext.from_kwargs()
# Create a dataset called ``contracts``
DATASET_NAME = "contracts"
sf.create_dataset(DATASET_NAME)
# Add a datasource for the train, valid, and test splits which have been uploaded to
# MinIO in the contracts_data bucket as Parquet files with UID columns called uid
sf.create_datasource(
dataset=DATASET_NAME,
path="s3://snorkel-contracts-dataset/train.parquet",
file_type="parquet",
uid_col="uid",
split="train",
)
sf.create_datasource(
dataset=DATASET_NAME,
path="s3://snorkel-contracts-dataset/dev.parquet",
file_type="parquet",
uid_col="uid",
split="train",
)
sf.create_datasource(
dataset=DATASET_NAME,
path="s3://snorkel-contracts-dataset/valid.parquet",
file_type="parquet",
uid_col="uid",
split="valid",
)
sf.create_datasource(
dataset=DATASET_NAME,
path="s3://snorkel-contracts-dataset/test.parquet",
file_type="parquet",
uid_col="uid",
split="test",
)
Create an application
Now that we have a dataset with datasources, let’s create an application. Here we create a text extraction application to extract dates from the “text” field of the data.
import snorkelflow.client as sf
APP_NAME = "date_extraction"
# Create a text extraction application.
# This will create an application with a block of operators suited for text extraction.
app = sf.create_text_extraction_application(
application_name=APP_NAME,
dataset=DATASET_NAME,
input_schema=["url", "text"],
extraction_field="text", # specify the field from which spans will be extracted.
labels=["label1", "label2"],
)
# Get the UID of the node "SpanExtractor" in the DAG.
extractor_node = sf.get_node_uid(APP_NAME, search_op_type="SpanExtractor")[0]
# Commit "DateSpanExtractor" to the node
sf.commit_builtin_operator(
node=extractor_node,
op_type="DateSpanExtractor",
op_config={"field": "text"}
)
# Get the UID of the node "SpanReducer" in the DAG.
reducer_node = sf.get_node_uid(APP_NAME, search_op_type="SpanReducer")[0]
# Commit "IdentityReducer" to the node
sf.commit_builtin_operator(
node=reducer_node,
op_type="IdentityReducer",
op_config={}
)
# Optionally, add "DateSpanNormalizer" after "DateSpanExtractor" node.
preview_node = sf.get_node_uid(APP_NAME, search_op_type="SpanPreviewPreprocessor")[0]
sf.add_node(
application=APP_NAME,
input_node_uids=[extractor_node],
output_node_uid=preview_node,
expected_op_type="Featurizer",
op_type="DateSpanNormalizer",
)
Alternatively, you can create the same application using lower-level methods.
import snorkelflow.client as sf
# Create an application without any block.
sf.create_application(
application_name=APP_NAME,
dataset=DATASET_NAME,
input_schema=["url", "text"],
)
# Add a block of operators using the application template "te" (Text Extraction).
from snorkelflow.types.workflows import INPUT_NODE_ID
sf.add_block_to_application(
application=APP_NAME,
template_id="te",
input_node=INPUT_NODE_ID,
block_config=dict(
label_map=dict(UNKNOWN=-1, NEGATIVE=0, POSITIVE=1),
extraction_field="text", # specify the field from which spans will be extracted.
),
)
# Get the application info
app = sf.get_application(APP_NAME)
# Run the same commands as above to commit "DateSpanExtractor" and "IdentityReducer"
Train a custom model using a Snorkel Flow training dataset
The following snippet is an example workflow to integrate a custom machine learning pipeline with the Snorkel Flow platform. Using the application in Quick Start: End-to-End Document Classification as an example, we will first get a training dataset created in Snorkel Flow, then train an ML model, and finally upload the predictions to Snorkel Flow in order to use the integrated analysis tools. Here, we use LogisticRegression and CountVectorizer from scikit-learn as an example, but you can use any model and featurizer of your choice.
from sklearn.linear_model import LogisticRegression
from sklearn.feature_extraction.text import CountVectorizer
APP_NAME = "YOUR_APP_NAME" # Replace with your application name
feature_column = "text"
training_set_uid = 1
application_uid = sf.get_application_uid(name=APP_NAME)
node = sf.get_model_node(application_uid)
# Get the dataframe of each split and set aside the training set labels of the train split
df_train = sf.get_node_data(
node=node,
split="train",
training_set_labels=True,
training_set_uid=training_set_uid,
training_set_filter_unlabeled=True,
)
df_dev = sf.get_node_data(node=node, split="dev")
df_valid = sf.get_node_data(node=node, split="valid")
df_test = sf.get_node_data(node=node, split="test")
y_train = df_train.pop("training_set_labels")
# Train a featurizer on the feature column
vectorizer = CountVectorizer()
X_train = vectorizer.fit_transform(df_train[feature_column])
# Train a custom model on the training dataset
model = LogisticRegression().fit(X_train, y_train)
# Register the custom model with Snorkel Flow
# training_set argument is the training_set_uid as defined above
model_uid = sf.register_model(
node=node, description="My custom model",
training_set=training_set_uid
)
# For each split, get a feature matrix, get predictions, and add them to Snorkel Flow
for df in (df_dev, df_valid, df_test):
X = vectorizer.transform(df[feature_column])
preds = model.predict(X)
x_uids = df.index.tolist()
sf.add_predictions(
node=node,
x_uids=x_uids,
predicted_labels=preds,
model_uid=model_uid,
)
Load data with labels/annotations
In the above example, we have just seen that data can be loaded with training labels. Here, we show how one might load other metadata (i.e., ground truth labels, training labels, predictions, and/or tags) into the dataframe.
import snorkelflow.client as sf
# Load data with training labels
df_train = sf.get_node_data(
node=node,
split="train",
training_set_labels=True,
training_set_uid=1,
training_set_sampler_config={"strategy": "auto"},
)
# Load data
df_dev = sf.get_node_data(
node=node,
split="dev",
tags=True,
)
# Load data with ground truth labels and predictions
df_test = sf.get_node_data(
node=node,
split="test",
ground_truth=True, # True by default but explicitly set for illustration purposes.
model_uid=model_uid,
model_predictions=True,
)
Generating labeling functions in the notebook
Labeling functions are defined using the @labeling_function
decorator, which wraps
a Python function that takes in a Pandas Series object representing an individual
data point as input and returns a string representing the label.
The following example shows a labeling function looking for the substring "xyz"
in
a field of the data.
It returns the label "LABEL"
if it finds the substring, and otherwise returns
the special label "UNKNOWN"
.
from snorkel.labeling.lf import labeling_function
@labeling_function(name="my_lf")
def lf(x):
if "xyz" in x.field_name:
return "LABEL"
return "UNKNOWN"
# Save to an application
sf.add_code_lf(node, lf, label="LABEL") # Default applied to split: "dev"
In a real LF, the label string (here, "LABEL"
) needs to be replaced with the string
of the label configured for the application.
Once you’ve saved your LF, you will immediately see returned statistics from
applying it to the dev
set. If you click the Update Statistics button on the left pane
of the Label page, you can also see the LF in the LF Summary pane.
Use variables or imports from outside the LF’s scope
Labeling functions can also rely on external resources,
like functions or objects defined outside of the labeling function as well as external
libraries.
Functions and objects can be passed to the labeling function using the resources
variable, and add a matching keyword argument to the function signature
(find_word_index
in the example below).
The recommended way to use additional libraries is to import them in the function body.
from snorkel.labeling.lf import labeling_function
def find_word_index(text):
import numpy
try:
idx = numpy.where(text.split(" ").index("employee"))
except:
idx = numpy.array([])
return idx
@labeling_function(name="my_lf", resources=dict(find_word_index=find_word_index))
def lf(x, find_word_index):
import numpy
idx = find_word_index(x.text)
if numpy.mean(idx) <= 1000:
return "employment"
else:
return "UNKNOWN"
sf.add_code_lf(node, lf, label="employment")
While resources
can be a nested dictionary, functions should be its direct children otherwise they would break when Snorkel Flow upgrades to a newer version of Python. See @labeling_function
for more details.
Installing additional NLTK packages for use in an LF or operator
Snorkel Flow comes with popular and vader_lexicon NLTK packages pre-installed. If additional NLTK packages are needed, you can install into the CUSTOM_NLTK_PATH directory. After installation, all code LFs can make use of them. We show a simple example below.
from snorkelflow.utils.file import CUSTOM_NLTK_PATH
from snorkelflow.studio import resources_fn_labeling_function
import nltk
nltk.download("punkt", download_dir=CUSTOM_NLTK_PATH)
nltk.download("averaged_perceptron_tagger", download_dir=CUSTOM_NLTK_PATH)
def get_nltk_tokenizer_parser() -> Dict:
import nltk
return {"tokenize": nltk.word_tokenize, "parse": nltk.pos_tag}
@resources_fn_labeling_function(
name="lf_1", resources_fn=get_nltk_tokenizer_parser,
)
def lf_1(x: pd.Series, tokenize: Callable, parse: Callable) -> str:
pos_pairs = parse(tokenize(x.span_preview))
for token, pos in pos_pairs:
if pos == "NNP":
return "employment"
return "UNKNOWN"
sf.add_code_lf(node, lf_1, label="employment")
Sampler configs for data loading and model training
Snorkel Flow offers a variety of options for data sampling when loading datasets or
training models.
This is particularly useful when loading large or unbalanced datasets.
Sampling options are passed via a sampler configuration in the Python SDK for functions
like get_node_data
and train_model
.
The sampler configuration is a dictionary with the following three keys:
strategy
(required), params
(optional), and class_counts
(optional).
The Options for params
and class_counts
vary by sampling strategy.
none
No sampling is performed.
Example
sampler_config = {"strategy": "none"}
fixed
Sample a fixed fraction of each class.
Params
p
: fraction of each class to sample (must be > 0 and <= 1)
Example
sampler_config = {"strategy": "fixed", "params": {"p": 0.5}}
class_count
Sample the provided number for each class. Classes are upsampled or downsampled to matche the target counts if needed.
Class counts: the number of samples per class
Example
sampler_config = {"strategy": "class_count", "class_counts": [100, 200, 100]}
target_proportion
Attempt to match the provided class proportions. Upsample classes to match the target proportions if needed.
Class counts: unnormalized proportions for each class
Params
max_sample_multiple
: the sampled dataset will be no more thanmax_sample_multiple
-times the size of the original dataset
Example
sampler_config = {
"strategy": "target_proportion",
"params": {"max_sample_multiple": 5},
"class_counts": [0.333, 0.333, 0.333],
}
auto
Attempt to match the ground truth class proportions of the valid set. Upsample classes to match the target proportions if needed.
Params
max_sample_multiple
: the sampled dataset will be no more than max_sample_multiple-times the size of the original datasetmin_total
: minimum total number of valid set labels to perform sampling; if not met, no sampling occurs- `min_per_class``: minimum number of valid set labels for each class to perform sampling; if not met, no sampling occurs
Example
sampler_config = {
"strategy": "auto",
"params": {"max_sample_multiple": 5, "min_total": 100},
}