SDK quickstart
This guide shows you how to access or install Snorkel's SDK, and then use it to train a simple document classification model, mirroring the Document classification GUI tutorial.
Install the SDK locally
Requirements
- Python version 3.9-3.11
- A Snorkel account with admin or developer access
- Your Snorkel API key
- Your Snorkel instance host name or IP address. You can copy the host name from your Snorkel instance's URL.
Install the SDK
Run the following command to install the basic Snorkel SDK in your Python environment:
pip install "snorkelflow[sdk]" "aiobotocore==2.17.0" \
--extra-index-url https://{SNORKELFLOW_API_KEY}@{your-snorkel-hostname}/sdk/repo
Special characters in the API key, such as /, have to be percent-encoded (see this explanation for details).
Use the code snippet below to percent-encode your API key.
import urllib.parse
print(urllib.parse.quote("your-api-key", safe=""))
Known issues
aiobotocore==2.17.0
must be included explicitly in the installation
Access the SDK from a Snorkel instance
Requirements
- A Snorkel account with admin or developer access
- Your Snorkel API key
You can call the Snorkel SDK from a Jupyter notebook hosted on your Snorkel instance.
-
Select Notebook from the left navigation.
-
Select + to create a new notebook from scratch, or upload one.
-
Select Python 3 (ipykernel) from the Notebook section. A new Jupyter notebook opens.
-
Authenticate with Snorkel. To create a new API key, follow the steps for generating an API key.
import snorkelflow.client as sf
# Use your Admin or Developer Snorkel API key
api_key = 'abcdefg' #INPUT - Replace with your API key
# Set your workspace
workspace_name = 'YOUR_WORKSPACE_NAME' #INPUT - Replace with your workspace name
# Authenticate with Snorkel Flow
ctx = sf.SnorkelFlowContext.from_kwargs(
api_key=api_key,
workspace_name=workspace_name,
)
Now you can use Snorkel's SDK.
Authentication
All API requests to Snorkel must be authenticated. The
Python SDK client needs to use an
authenticated SnorkeflFlowContext
object to make API requests to Snorkel
services.
You will need an API key to authenticate.
Generate an API Key
Follow these instructions to generate an API key:
Authenticate from an external system
When you connect to Snorkel locally or from another external system, you must provide additional settings and authentication secrets. Use the following connection template:
import os
import snorkelflow.client as sf
# Snorkel Flow SDK configuration
SF_CONFIG = {
"endpoint": "https://<your-snorkel-hostname>",
"minio_endpoint": "https://<your-minio-endpoint>",
"api_key": "<your-api-key>",
"minio_access_key": "<your-minio-access-key>",
"minio_secret_key": "<your-minio-secret-key>",
"workspace_name": "<your-workspace-name>",
"debug": True # Optional: set to False to disable debug logging
}
# Set MinIO-related environment variables
os.environ["MINIO_URL"] = "https://<your-minio-api-url>"
os.environ["MINIO_ACCESS_KEY"] = "<your-minio-access-key>"
os.environ["MINIO_SECRET_KEY"] = "<your-minio-secret-key>"
# Initialize Snorkel Flow context
ctx = sf.SnorkelFlowContext.from_endpoint_url(**SF_CONFIG)
endpoint
: You can copy the host name from your Snorkel instance's URL.api_key
: Your Snorkel API key- Contact your Snorkel admin or support representative to obtain your
minio_endpoint
,minio_access_key
andminio_secret_key
.
Authenticate from a Snorkel-hosted instance
When you call the SDK from a Snorkel instance (through a Jupyter notebook), you can provide your API key from an environment variable, or explicitly.
If you set your API key as the SNORKELFLOW_API_KEY
environment variable, the
SDK will find and use this API key automatically.
bash $ export SNORKELFLOW_API_KEY="abcdefg"
You can also pass an API key through the
api_key
argument while constructing the SnorkelFlowContext
object.
import snorkelflow.client as sf
# Use your Admin or Developer Snorkel API key
api_key = 'abcdefg' #INPUT - Replace with your API key
# Set your workspace
workspace_name = 'YOUR_WORKSPACE_NAME' #INPUT - Replace with your workspace name
# Authenticate with Snorkel Flow
ctx = sf.SnorkelFlowContext.from_kwargs(
api_key=api_key,
workspace_name=workspace_name,
)
Quick Start: End-to-End Document Classification
This quick start guide will walk you through a full end-to-end example of using Snorkel Flow to build a document classification model. In practice, your Snorkel Flow journey will involve heavy interplay between the UI and the SDK, but for the purposes of this guide, we'll stick to the SDK.
In our task, we will be classifying 20,000 contracts into one of the four following categories:
- Loan: A loan agreement
- Services: A services agreement
- Stock: A stock purchase agreement
- Employment: An employment agreement
The majority of these contracts will not have any labels for us to train a fully supervised model. We will use the Snorkel Flow SDK to ingest our dataset, write code-based labeling functions, train a model, and deploy that model for external use.
Begin by importing the necessary packages
import pandas as pd
import snorkelflow.client as sf
from snorkelflow.models.model_configs import SKLEARN_LOGISTIC_REGRESSION_CONFIG
from snorkelflow.sdk import Dataset, MLflowDeployment
from snorkel.labeling.lf import labeling_function
# Connect to Snorkel. Uncomment either the hosted or external option
# And replace with the full connection block including API key
# ctx = sf.SnorkelFlowContext.from_endpoint_url( ... ) # External connection
# ctx = sf.SnorkelFlowContext.from_kwargs( ... ) # Snorkel-hosted connection
For the connection, uncomment the ctx
line and replace it with the full
authentication block from the appropriate section above.
Creating a Dataset
We will use the snorkelflow.sdk
module to create a Dataset. Snorkel datasets
are the main mechanism for ingesting and interacting with source data.
Upstream of Snorkel, the data is divided into several data splits ("train,"
"dev" as a subset of "train," "valid," and "test") for our model development
pipeline. Our data contains some null/missing values, which might cause some
issues in our modeling pipeline. Our data upload will then consist of imputing
our null values, then uploading the files to Snorkel.
contracts_dataset = Dataset.create("contracts-clf-dataset")
# Upload the train splits
train_df = pd.read_parquet("s3://snorkel-contracts-dataset/train.parquet")
train_df = train_df.fillna("")
contracts_dataset.create_datasource(train_df, uid_col="uid", split="train")
dev_df = pd.read_parquet("s3://snorkel-contracts-dataset/dev.parquet")
dev_df = dev_df.fillna("")
contracts_dataset.create_datasource(dev_df, uid_col="uid", split="train")
valid_df = pd.read_parquet("s3://snorkel-contracts-dataset/valid.parquet")
valid_df = valid_df.fillna("")
contracts_dataset.create_datasource(valid_df, uid_col="uid", split="valid")
test_df = pd.read_parquet("s3://snorkel-contracts-dataset/test.parquet")
test_df = test_df.fillna("")
contracts_dataset.create_datasource(test_df, uid_col="uid", split="test")
You'll notice that the second file has the name dev.parquet
and is
loaded with split="train"
. A dev split is a subset of the train split that
enables faster model development cycles. Snorkel still uses the entire training
split when actually training a model. The dev split is primarily used to
accelerate the development workflow in Studio by making data loading, labeling
function application, and performance analysis faster.
After uploading, you have the option to view the data in your dataset by running contracts_dataset.get_dataframe(...)
.
Creating an Application
Now that we have our data uploaded, we can create an application. Applications string together data processing logic, programmatic labeling, and model training into a single interface that can later be deployed end-to-end.
To create an application, we will use the Snorkel Flow client snorkelflow.client
that we imported earlier.
application_uid = sf.create_classification_application(
"contracts-clf",
dataset="contracts-clf-dataset",
input_schema=["text"],
labels = ["services", "stock", "loan", "employment"],
label_col = "label"
)["application_uid"]
An application is set up as a "directed acyclic graph" (DAG), composed of a series of data transformations. This means that data moves from one transformation to the next, and is modified by one transformation at a time, but the data never flows backwards or loops.
The individual elements that make up the DAG are called nodes. Some nodes are Operators, transformations that manipulate the data by changing its rows and columns. Other nodes are Model Nodes, which are environments for iterating on training set development and model training.
Most of the core programmatic labeling loop will happen in these "Model nodes". To interact with a model node through the SDK, we will need its UID. Additionally, we need to signal to our application that we are done with set up and want to start developing our training set. To do this, we need to activate the node with the sf.add_active_datasources
function.
model_node_uid = sf.get_model_node(application_uid)
activate_datasources_job = sf.add_active_datasources(model_node_uid, sync=True)
We will use this model node UID to write labeling functions and train our models.
Writing Labeling Functions
The best way to view, filter, slice, and comment on your data is through the Snorkel Flow UI. In this way, the UI and SDK have highly complementary roles in the Snorkel Flow workflow. The SDK can be used to write custom, high-quality labeling functions to programmatically label unlabeled data. Here, we will create 4 labeling functions to label each of our 4 classes. In practice, you would come up with these labeling functions based on your interactions with the data in the UI.
To view how performant our labeling functions are, we can run sf.get_lf_summary
. This will return a Pandas DataFrame with evaluation metrics for each labeling function.
@labeling_function(name="employment_lf")
def lf_emp(x):
if "employment" in x.text:
return "employment"
return "UNKNOWN"
sf.add_code_lf(model_node_uid, lf_emp, label="employment")
@labeling_function(name="stock_lf")
def lf_stock(x):
if "STOCK PURCHASE AGREEMENT" in x.text:
return "stock"
return "UNKNOWN"
sf.add_code_lf(model_node_uid, lf_stock, label="stock")
@labeling_function(name="services_lf")
def lf_services(x):
import re
if re.search(r"This.{1,50}Service Agreement", x.text, re.IGNORECASE):
return "services"
return "UNKNOWN"
sf.add_code_lf(model_node_uid, lf_services, label="services")
@labeling_function(name="loan_lf")
def lf_loan(x):
import re
if re.search(r"loan agreement", x.text, re.IGNORECASE):
return "loan"
return "UNKNOWN"
sf.add_code_lf(model_node_uid, lf_loan, label="loan")
sf.get_lf_summary(model_node_uid)
Code LFs can incorporate external libraries, algorithms, external databases, and more – they are extremely flexible tools for programmatically labeling data. Snorkel users have used Code LFs to create rich embeddings-based labeling functions, built LFs on top of external deep learning models, and more.
Training a Model
Once we have our labeling functions, we can begin to create a training set and train a model. There are three parts to this process – we aggregate our LFs as an LF Package, use the Snorkel Flow label model to aggregate LF votes into a training set, and then train a machine learning classifier on this training set. After that, we will view model metrics to make sure our model is performing how we'd expect. We'll check out our metrics against a previously held-out validation split to see if we need to do any more tuning.
sf.package_lfs(model_node_uid)
sf.add_training_set(model_node_uid)
model_uid = sf.train_model(
model_node_uid,
feature_fields=["text"],
model_description="My first model",
model_config = SKLEARN_LOGISTIC_REGRESSION_CONFIG.dict()
)['model_uid']
sf.get_model_metrics(model_node_uid, model_uid, split="valid")
In a real-world setting, you would iterate on this process until you are satisfied with your model's performance. You can use Snorkel Flow's in-UI built-in error analysis tools to find slices of data that need more coverage or need more accurate labeling functions. You can also create your own custom error analysis tools, and use them to iterate on your LFs and improve your models!
Deployment
Once you are satisfied with your score, you can deploy your model as an MLFlow package to use it in your external data pipelines. In this example, we'll create an MLFlow deployment and then run our test dataset through it before downloading it for use elsewhere in our data pipeline.
sf.commit_model_to_node(model_node_uid, model_uid)
deployment = MLflowDeployment.create("contracts-clf", name="contracts-clf-deployment")
test_df = contracts_dataset.get_dataframe(split="test", max_rows=5)
results = deployment.execute(df=test_df)
# Examine the "results" DataFrame to make sure it looks good
deployment.download('./deployment-path/')
That's it! In this tutorial, we have uploaded a messy dataset, cleaned it up, ingested it into Snorkel, written labeling functions, viewed LF stats, created a programmatically labeled training set, trained a model, and deployed it for external use. This is just a snippet of what is possible with Snorkel Flow and the SDK. The SDK is a powerful tool for interacting with the core Snorkel workflow programmatically, and can extend and empower you to create more robust and flexible data pipelines.