Skip to main content
Version: 0.94

Quick start

Installing the SDK

The basic SDK can be installed on any Python environment as below:

pip install "snorkelflow[sdk]" \
--extra-index-url https://"$SNORKELFLOW_API_KEY"@{your-snorkel-flow-hostname}/sdk/repo

Please ask your IT team about the IP address / hostname of your Snorkel Flow installation. See Authentication for how to generate an API key.

NOTE

Special characters in the API key, such as /, have to be percent-encoded (see here for details). Use the code snippet below to percent-encode your API key.

import urllib.parse
print(urllib.parse.quote("your-api-key", safe=""))

Authentication

Authentication Requirements

NOTE

The version of the Python SDK accessible from in-platform notebooks is pre-authenticated with an API Key that is auto-generated and injected into the environment by Snorkel Flow. If you are using the in-platform notebook environment you do not need to worry about authenticating the SDK manually.

All API requests to Snorkel Flow services must be authenticated. If you are interacting with these services via the Snorkel Flow SDK from outside of the in-platform notebook environment you must provide an API key to the SDK which it will use to authenticate requests.

Generating an API Key

Instructions on generating an API key can be found within the Generating API Keys guide.

Authenticating the SDK Client

The Python SDK client needs to use an authenticated SnorkeflFlowContext object to make API requests to Snorkel Flow services. This object can either pull an API key directly from your environment variables or can be provided an API key explicitly upon construction.

Authenticating via Environment Variables

If you place your API key under the SNORKELFLOW_API_KEY environment variable the SDK will find and use this API key automatically.

bash $ export SNORKELFLOW_API_KEY="XXXXYYYYZZZZ"

Authenticating Directly

The Snorkel Flow client can be authenticated by passing an API key through the api_key argument while constructing the SnorkelFlowContext object.

import snorkelflow.client as sf
ctx = sf.SnorkelFlowContext.from_kwargs(
...,
api_key="XXXXYYYYZZZZ"
)

Quick Start: End-to-End Document Classification

This quick start guide will walk you through a full end-to-end example of using Snorkel Flow to build a document classification model. In practice, your Snorkel Flow journey will involve heavy interplay between the UI and the SDK, but for the purposes of this guide, we’ll stick to the SDK.

In our task, we will be classifying 20,000 contracts into one of the four following categories: : - Loan: A loan agreement

  • Services: A services agreement
  • Stock: A stock purchase agreement
  • Employment: An employment agreement

The majority of these contracts will not have any labels for us to train a fully supervised model. We will use the Snorkel Flow SDK to ingest our dataset, write code-based labeling functions, train a model, and deploy that model for external use.

Begin by importing the necessary packages

import pandas as pd
import snorkelflow.client as sf
from snorkelflow.models.model_configs import SKLEARN_LOGISTIC_REGRESSION_CONFIG
from snorkelflow.sdk import Dataset, MLflowDeployment
from snorkel.labeling.lf import labeling_function
ctx = sf.SnorkelFlowContext.from_kwargs()

Creating a Dataset

We will use the snorkelflow.sdk module to create a Dataset. Snorkel Flow Datasets are the main mechanism for ingesting and interacting with source data. Upstream of Snorkel, we have already split up our data into several data splits (e.g. “train”, “valid”, “test”) for our model development pipeline. Our data contains some null/missing values, which might cause some issues in our modeling pipeline. Our data upload will then consist of imputing our null values, then uploading the files to Snorkel Flow.

contracts_dataset = Dataset.create("contracts-clf-dataset")
# Upload the train splits
train_df = pd.read_parquet("s3://snorkel-contracts-dataset/train.parquet")
train_df = train_df.fillna("")
contracts_dataset.create_datasource(train_df, uid_col="uid", split="train")

dev_df = pd.read_parquet("s3://snorkel-contracts-dataset/dev.parquet")
dev_df = dev_df.fillna("")
contracts_dataset.create_datasource(dev_df, uid_col="uid", split="train")

valid_df = pd.read_parquet("s3://snorkel-contracts-dataset/valid.parquet")
valid_df = valid_df.fillna("")
contracts_dataset.create_datasource(valid_df, uid_col="uid", split="valid")

test_df = pd.read_parquet("s3://snorkel-contracts-dataset/test.parquet")
test_df = test_df.fillna("")
contracts_dataset.create_datasource(test_df, uid_col="uid", split="test")

You’ll notice that the second file we uploaded ends in dev.parquet. In the Snorkel workflow, it’s useful to have some very small selection of manually-labeled data in your train split to help you write labeling functions (you will sometimes see this called the “development split”). If we so desire, we can view the data in our dataset by running contracts_dataset.get_dataframe(...).

Creating an Application

Now that we have our data uploaded, we can create an application. Applications string together data processing logic, programmatic labeling, and model training into a single interface that can later be deployed end-to-end. To create an application, we will use the Snorkel Flow client snorkelflow.client that we imported earlier.

application_uid = sf.create_classification_application(
"contracts-clf",
dataset="contracts-clf-dataset",
input_schema=["text"],
labels = ["services", "stock", "loan", "employment"],
label_col = "label"
)["application_uid"]

An application is set up as a “directed acyclic graph” (DAG), composed of a series of data transformations. This means that data moves from one transformation to the next, and is modified by one transformation at a time, but the data never flows backwards or loops.

The individual elements that make up the DAG are called nodes. Some nodes are Operators, transformations that manipulate the data by changing its rows and columns. Other nodes are Model Nodes, which are environments for iterating on training set development and model training. Most of the core programmatic labeling loop will happen in these “Model nodes”. To interact with a model node through the SDK, we will need its UID. Additionally, we need to signal to our application that we are done with set up and want to start developing our training set. To do this, we need to activate the node with the sf.add_active_datasources function.

model_node_uid = sf.get_model_node(application_uid)
activate_datasources_job = sf.add_active_datasources(model_node_uid, sync=True)

We will use this model node UID to write labeling functions and train our models.

Writing Labeling Functions

The best way to view, filter, tag, and comment on your data is through the Snorkel Flow UI. In this way, the UI and SDK have highly complementary roles in the Snorkel Flow workflow. The SDK can be used to write custom, high-quality labeling functions to programmatically label unlabeled data. Here, we will create 4 labeling functions to label each of our 4 classes. In practice, you would come up with these labeling functions based on your interactions with the data in the UI.

To view how performant our labeling functions are, we can run sf.get_lf_summary. This will return a Pandas DataFrame with evaluation metrics for each labeling function.

@labeling_function(name="employment_lf")
def lf_emp(x):
if "employment" in x.text:
return "employment"
return "UNKNOWN"
sf.add_code_lf(model_node_uid, lf_emp, label="employment")

@labeling_function(name="stock_lf")
def lf_stock(x):
if "STOCK PURCHASE AGREEMENT" in x.text:
return "stock"
return "UNKNOWN"
sf.add_code_lf(model_node_uid, lf_stock, label="stock")

@labeling_function(name="services_lf")
def lf_services(x):
import re
if re.search(r"This.{1,50}Service Agreement", x.text, re.IGNORECASE):
return "services"
return "UNKNOWN"
sf.add_code_lf(model_node_uid, lf_services, label="services")

@labeling_function(name="loan_lf")
def lf_loan(x):
import re
if re.search(r"loan agreement", x.text, re.IGNORECASE):
return "loan"
return "UNKNOWN"
sf.add_code_lf(model_node_uid, lf_loan, label="loan")

sf.get_lf_summary(model_node_uid)

Code LFs can incorporate external libraries, algorithms, external databases, and more – they are extremely flexible tools for programmatically labeling data. Snorkel users have used Code LFs to create rich embeddings-based labeling functions, built LFs on top of external deep learning models, and more.

Training a Model

Once we have our labeling functions, we can begin to create a training set and train a model. There are three parts to this process – we aggregate our LFs as an LF Package, use the Snorkel Flow label model to aggregate LF votes into a training set, and then train a machine learning classifier on this training set. After that, we will view model metrics to make sure our model is performing how we’d expect. We’ll check out our metrics against a previously held-out validation split to see if we need to do any more tuning.

sf.package_lfs(model_node_uid)
sf.add_training_set(model_node_uid)
model_uid = sf.train_model(
model_node_uid,
feature_fields=["text"],
model_description="My first model",
model_config = SKLEARN_LOGISTIC_REGRESSION_CONFIG.dict()
)['model_uid']

sf.get_model_metrics(model_node_uid, model_uid, split="valid")

In a real-world setting, you would iterate on this process until you are satisfied with your model’s performance. You can use Snorkel Flow’s in-UI built-in error analysis tools to find slices of data that need more coverage or need more accurate labeling functions. You can also create your own custom error analysis tools, and use them to iterate on your LFs and improve your models!

Deployment

Once you are satisfied with your score, you can deploy your model as an MLFlow package to use it in your external data pipelines. In this example, we’ll create an MLFlow deployment and then run our test dataset through it before downloading it for use elsewhere in our data pipeline.

sf.commit_model_to_node(model_node_uid, model_uid)
deployment = MLflowDeployment.create("contracts-clf", name="contracts-clf-deployment")
test_df = contracts_dataset.get_dataframe(split="test", max_rows=5)
results = deployment.execute(df=test_df)
# Examine the "results" DataFrame to make sure it looks good
deployment.download('./deployment-path/')

That’s it! In this tutorial, we have uploaded a messy dataset, cleaned it up, ingested it into Snorkel, written labeling functions, viewed LF stats, created a programmatically labeled training set, trained a model, and deployed it for external use. This is just a snippet of what is possible with Snorkel Flow and the SDK – the SDK is a powerful tool for interacting with the core Snorkel workflow programmatically, and can extend and empower you to create more robust and flexible data pipelines.