Skip to main content
Version: 0.93

snorkelflow.sdk.Dataset

class snorkelflow.sdk.Dataset(name, uid, mta_enabled)

Bases: object

The Dataset object represents a dataset in Snorkel Flow.

Datasets Quickstart

In this quickstart, we will create a Dataset and upload a file to that Dataset as a data source. We will then show how you might go about ingesting that data into the platform.

We will need the following imports

from snorkelflow.sdk import Dataset
import snorkelflow.client as sf
import pandas as pd
ctx = sf.SnorkelFlowContext.from_kwargs()

We will begin by creating a new Dataset.

>>> contracts_dataset = Dataset.create("contracts-dataset")
Successfully created dataset contracts-dataset with UID 0 in workspace 0

Next, we will attempt to save a file to the Dataset as a data source. This file will be in S3. File upload will initially fail because this file contains null values.

>>> contracts_dataset.create_datasource("s3://snorkel-contracts-dataset/dev.parquet", uid_col="uid", split="train")
UserInputError: Errors...

In this particular example, we decide we don’t care about these rows, so we can use Pandas to edit the file and remove the null values. We can then re-upload the data, this time uploading the DataFrame directly without needing to save it to a file again. In some other cases, you may want to either edit those null cells or fix them in your upstream data pipeline.

>>> df = pd.read_parquet("s3://snorkel-contracts-dataset/dev.parquet")
>>> df = df.dropna()
>>> contracts_dataset.create_datasource(df, uid_col="uid", split="train")
+0.07s Starting data ingestion
+1.85s Ingesting data
+2.05s Data ingestion complete

To verify that has worked correctly, we can view this Dataset’s data sources.

>>> contracts_dataset.datasources
[{'datasource_uid': 668,...}]

Dataset Concepts

Datasets

Datasets are how your data is represented in Snorkel Flow. Snorkel Flow projects always begin with a single Dataset. Datasets bring external data into Snorkel Flow and help manage that data once it has been ingested. Datasets are composed of individual chunks of data, called data sources, and provides an interface for managing individual data sources.

Data Sources

Data sources are the individual chunks of data that make up a Dataset. A data source can be a file you upload from local storage, a file located in a remote (S3, MinIO, etc.) storage service, or an in-memory Pandas DataFrame. Data sources shouldn’t be touched directly, but should be managed by interfacing with their parent Dataset. The best way to deal with data sources is to treat them as blocks of data, which can be added and removed but only occasionally changed. Data sources can be given names during their creation, but are usually referred to using a data source UID, an integer ID assigned to each data source when it is created.

Derived Data Sources

When an application is created using a dataset, Snorkel Flow will create a derived data source for each data source in the dataset. Derived data sources are intermediate representations of data that track the lineage of the data as it is being processed and are associated with only one application. Note that some operations, such as changing the split of a data source, don’t propagate to any of the derived data source once they are derived, and vice versa. Derived data sources are viewable in the Snorkel Flow UI on the “View Data Sources” button, accessible from the “Develop” screen of an application.

Modifying Data

In general, data sources should be treated as immutable. This means that you should avoid modifying the underlying data source once it has been uploaded. If your goal is to filter out rows, add feature columns, or remove feature columns, you should use an Operator to do so. Alternatively, you can modify your data upstream of Snorkel and create a new Dataset with your edited data.

The Python SDK provides limited support for specific one-off operations on data sources. Sometimes you might need to reformat the data in an existing column to make it compatible with processing logic. In this case, you can use the dataset.update_datasource_data method to swap out an existing data source for a new one with the updated data. However, be aware that this is an irreversible change, and updating data in this way is an expensive operation that will require all downstream applications to be refreshed.

Splits

Data sources belong to splits. Splits help dictate how the data will be used in the model development process. Data sources allocated to the train split will be used for model training and labeling function development. Data sources allocated to the valid split will be used to validate models iteratively and to perform error analysis. Data sources allocated to the test split will be used to evaluate the final model. Data source splits may be updated as needed, but be aware that model metrics and labeling function performance will change based on how the splits are allocated.

Data Upload Guardrails

When you upload data to Snorkel Flow, it must pass a series of safety checks to ensure that the data is valid and safe to load into the platform. These checks include:

  • Number of rows: A single data source should not exceed 10 million rows. If your data source exceeds this limit, you should split it into multiple data sources before uploading.

  • Column memory: The average memory usage of a single column must be under 20MB across all columns in your data source. For performance, the average column memory usage should be under 5MB. If your data source exceeds this limit, you should split it into multiple data sources before uploading.

  • Null values: Snorkel Flow will not permit data to be uploaded if any null values exist in that data source. If you have null values in your data, you might want to clean them up with the Pandas fillna() method before uploading.

  • Unique integer index: Snorkel Flow requires that each data source have a unique integer index column. The values in this index must be unique among all datasources in the Dataset. The values must also be unique, non-negative integers. If your Dataset does not already have this stable index column, you must create one before uploading.

  • Consistent schema: All data sources in a single Dataset should have the same columns. All columns that are in multiple data sources must have the same type. If you have columns that exist in some data sources but not others, you may see unexpected behavior in downstream tasks.

Fetching UIDs

Methods in the Dataset class will sometimes require a UID parameter. This is the unique identifier for the Dataset within Snorkel Flow. The Dataset UID can be retrieved by calling .uid on a Dataset object. Data source methods will sometimes require a data source UID, which can be retrieved by printing out the datasources by calling my_dataset.datasources. The data source UID is the datasource_uid field in the returned dictionary.

__init__(name, uid, mta_enabled)

Create a dataset object in-memory with necessary properties. This constructor should not be called directly, and should instead be accessed through the create() and get() methods

Parameters:
  • name (str) – The human-readable name of the dataset. Must be unique within the workspace

  • uid (int) – The unique integer identifier for the dataset within Snorkel Flow

  • mta_enabled (bool) – Whether or not multi-task annotation is enabled for this dataset

Methods

__init__(name, uid, mta_enabled)

Create a dataset object in-memory with necessary properties.

create(dataset_name[, enable_mta])

Creates and registers a new Dataset object.

create_batches([name, assignees, ...])

Create annotation batches for this dataset.

create_datasource(data, uid_col[, name, ...])

Creates a new data source withing the Dataset from either a filepath or a Pandas DataFrame.

create_label_schema(name, data_type, ...[, ...])

Create a label schema associated with this dataset.

delete(dataset[, force])

Delete a dataset based on the provided identifier

delete_datasource(datasource_uid[, force, sync])

Delete a data source.

get(dataset)

Fetches an already-created Dataset from Snorkel Flow and returns a Dataset object that can be used to interact with files and data

get_dataframe([split, max_rows, ...])

Read the Dataset's data into an in-memory Pandas DataFrame.

list()

Get a list of all Datasets.

update([name])

Update the metadata for this dataset.

update_datasource_data(old_datasource_uid, ...)

This function allows you to replace the data of an existing data source with new data.

update_datasource_split(datasource_uid, split)

Change the split of a data source that has already been uploaded to the dataset.

Attributes

batches

A list of batches belonging to this Dataset.

datasources

A list of data sources and associated metadata belonging to this Dataset.

label_schemas

A list of label schemas belonging to this Dataset.

mta_enabled

Whether or not multi-task annotation is enabled for this dataset.

name

The human-readable name of the dataset.

uid

The unique integer identifier for the dataset within Snorkel Flow.

classmethod create(dataset_name, enable_mta=False)

Creates and registers a new Dataset object. A Dataset object organizes and collects files and other sources of data for use in Snorkel Flow. A Dataset is restricted to a particular workspace, so only users in that workspace will be able to access that Dataset. Datasets must be initialized with a unique name

Examples

>>> from snorkelflow.sdk import Dataset
>>> my_dataset = Dataset.create(dataset_name="my-dataset")
Successfully created dataset my-dataset with UID 0 in workspace 0
Parameters:
  • dataset_name (str) – A name for the Dataset. This name must be unique within the workspace

  • enable_mta (bool, default: False) – If True, enable multi-task annotation for this dataset.

Returns:

A Dataset object that can be used to interact with the dataset in Snorkel Flow

Return type:

Dataset

create_batches(name=None, assignees=None, label_schemas=None, batch_size=None, num_batches=None, randomize=False, random_seed=123, selection_strategy=None, split=None, x_uids=None)

Create annotation batches for this dataset.

This is the recommended entrypoint for creating batches.

Parameters:
  • name (Optional[str], default: None) – The name of the batch

  • assignees (Optional[List[int]], default: None) – The user UIDs for the assignees of the batches

  • label_schemas (Optional[List[LabelSchema]], default: None) – The label schemas assigned for the batches

  • batch_size (Optional[int], default: None) – The size of the batches

  • num_batches (Optional[int], default: None) – The number of batches

  • randomize (Optional[bool], default: False) – Whether to randomize the batches

  • random_seed (Optional[int], default: 123) – The seed for the randomization

  • selection_strategy (Optional[SelectionStrategy], default: None) – The SelectionStrategy for the batches

  • split (Optional[str], default: None) – The split (“train”, “test”, or “valid”) of the batches

  • x_uids (Optional[List[str]], default: None) – The UIDs of the data sources to create batches from

Returns:

The list of created batches

Return type:

List[Batch]

create_datasource(data, uid_col, name=None, split='train', sync=True, run_checks=True)

Creates a new data source withing the Dataset from either a filepath or a Pandas DataFrame.

If you provide a filepath: A file can be a CSV or Parquet file that either exists in the local filesystem, or is accessible via an S3-compatible API (such as MinIO, or AWS S3). Files must have a stable integer index column that is unique across all data sources in the dataset.

If you provide a DataFrame: The DataFrame must have a unique integer column that does not contain duplicates across other sources of data. All DataFrame column names must be strings.

The data must pass all validation checks to be registered as a valid data source. If a DataFrame fails to pass all data validation checks, the upload will fail and the data source will not be registered.

Examples

>>> from snorkelflow.sdk import Dataset
>>> my_dataset = Dataset.get("my-dataset")
>>> my_dataset.create_datasource("my_data.csv", uid_col="id", split="train")
+0.07s Starting data ingestion
+1.85s Ingesting data
+2.05s Data ingestion complete
1 # UID of the datasource

>>> my_dataset.create_datasource(df, uid_col="id", name="dataframe-data", split="train")
+0.07s Starting data ingestion
+1.85s Ingesting data
+2.05s Data ingestion complete
1
Parameters:
  • data (Union[str, DataFrame]) – Either: - A path to a file in the local filesystem, or a path to an S3-compatible API, by default None. If filepath is not provided, a DataFrame must be provided instead - A Pandas DataFrame, by default None. If df is not provided, a filepath must be provided instead

  • uid_col (str) – Name of the UID column for this data. The values in this column must be unique non-negative integers that are not duplicated across files

  • name (Optional[str], default: None) – The name to give this data source. If not provided, the name of the file will be used, by default None. Adding a name is strongly recommended when uploading a DataFrame

  • split (Optional[str], default: 'train') – The name of the data split this data belongs to, by default Splits.train

  • sync (Optional[bool], default: True) – Whether execution should be blocked by this function, by default True. Note that Dataset().datasources may not be updated immediately if sync=False

  • run_checks (bool, default: True) – Whether we should run datasource checks. Recommended for safety, by default True

Returns:

If sync is True, returns the integer UID of the datasource. If sync is False, returns a job ID that can be monitored with sf.poll_job_id

Return type:

Union[str, int]

create_label_schema(name, data_type, task_type, label_map, multi_label=False, description=None, label_column=None, label_descriptions=None, primary_field=None)

Create a label schema associated with this dataset.

This is the recommended entrypoint for creating label schemas.

Parameters:
  • name (str) – The name of the label schema

  • data_type (str) – The data type of the label schema

  • task_type (str) – The task type of the label schema

  • label_map (Union[Dict[str, int], List[str]]) – A dictionary mapping label names to their integer values, or a list of label names

  • multi_label (bool, default: False) – Whether the label schema is a multi-label schema, by default False

  • description (Optional[str], default: None) – A description of the label schema, by default None

  • label_column (Optional[str], default: None) – The name of the column that contains the labels, by default None

  • label_descriptions (Optional[Dict[str, str]], default: None) – A dictionary mapping label names to their descriptions, by default None

  • primary_field (Optional[str], default: None) – The primary field of the label schema, by default None

Returns:

The label schema object

Return type:

LabelSchema

classmethod delete(dataset, force=False)

Delete a dataset based on the provided identifier

The operation will fail if any applications use this Dataset

Examples

>>> from snorkelflow.sdk import Dataset
>>> Dataset.delete("my-dataset")
Successfully deleted dataset my-dataset with UID 0.
Parameters:
  • dataset (Union[str, int]) – Name or UID of the dataset to delete

  • force (bool, default: False) – If True, delete any applications using the Dataset as well

Return type:

None

delete_datasource(datasource_uid, force=False, sync=True)

Delete a data source. Calling delete_datasource will fully remove the data source from the dataset.

warning
The operation will not be permitted if any applications are using the data source to avoid breaking downstream applications. If you are sure you want to delete the data source, use the flag force=True to override this check. This function may take a while.

Examples

>>> from snorkelflow.sdk import Dataset
>>> my_dataset = Dataset.get("my-dataset")
>>> my_dataset.delete_datasource(1)
Successfully deleted datasource with UID 1.
Parameters:
  • datasource_uid (int) – UID of the data source to delete. See all datasources for this dataset by viewing self.datasources.

  • force (bool, default: False) – boolean allowing one to force deletion of a datasource even if that datasource has dependent assets (ground truth, annotations, etc), by default false

  • sync (bool, default: True) – Poll job status and block until complete, by default true

Returns:

Optionally returns job_id if sync mode is turned off

Return type:

Optional[str]

classmethod get(dataset)

Fetches an already-created Dataset from Snorkel Flow and returns a Dataset object that can be used to interact with files and data

Examples

>>> from snorkelflow.sdk import Dataset
>>> my_dataset = Dataset.get("my-dataset")
Successfully retrieved dataset my-dataset with UID 0 in workspace 0.
Parameters:

dataset (Union[str, int]) – Either the name or UID of the dataset. A list of all accessible datasets can be retrieved with Dataset.list()

Returns:

A Dataset object that can be used to interact with files and data in Snorkel Flow.

Return type:

Dataset

get_dataframe(split=None, max_rows=10, target_columns=None, datasource_uid=None, use_source_index=True)

Read the Dataset’s data into an in-memory Pandas DataFrame. If only a subset of columns are required, they can be specified with target_columns. Note that changes to the DataFrame will not be reflected in the Dataset. To change the actual data in the dataset, you must swap out the relevant data sources.

note
By default, only 10 rows are read for memory safety. This limit can be increased by setting max_rows to a larger value, but this can be computationally intensive and may lead to unstable behavior.
note
By default, we will return the original index column name the data source was uploaded with. However, certain SDK workflows might require an internal representation of the index column, such as the snorkelflow.sdk.Deployment.execute function. If you run into issues because of this, run dataset.get_dataframe with the use_source_index parameter set to False.

Examples

>>> from snorkelflow.sdk import Dataset
>>> my_dataset = Dataset.get("my-dataset")
>>> df = my_dataset.get_dataframe(target_columns=["a", "b"])
<pd.DataFrame object with 10 rows and columns a, b>
>>> df = my_dataset.get_dataframe(datasource_uid=0, max_rows=None)
<pd.DataFrame object with 100 rows and columns a, b, c>
Parameters:
  • split (Optional[str], default: None) – The data split to load, by default None (all splits). Other options are “train”, “valid”, and “test”.

  • max_rows (Optional[int], default: 10) – The maximum number of rows to read, by default 10. Use max_rows=None to fetch all rows. Warning: setting this to a large value can be computationally intensive and may lead to unstable behavior

  • target_columns (Optional[List[str]], default: None) – A list of desired data columns, in case not all columns are required, by default None

  • datasource_uid (Optional[int], default: None) – Fetch a dataframe from a particular datasource_uid. A list of all datasource UIDs can be retrieved with Dataset().datasources

  • use_source_index (bool, default: True) – If true, returns the index column that the data source was originally uploaded with. If false, returns the Snorkel Flow internal column name. True by default.

Returns:

A Pandas DataFrame object displaying the data in this dataset

Return type:

pd.DataFrame

static list()

Get a list of all Datasets. The returned list includes the Dataset UID, the Dataset name, and additional metadata used to keep track of the Dataset’s properties.

Examples

>>> Dataset.list()
[
{
"name": "test-csv-str",
"uid": 116,
"datasources": []
},
...
]
Returns:

List of all dataset objects

Return type:

List[Dataset]

update(name='')

Update the metadata for this dataset. Only updating the name of this Dataset is currently supported. The new name for the dataset must be unique within the workspace.

Examples

>>> from snorkelflow.sdk import Dataset
>>> my_dataset = Dataset.get(dataset="my-dataset")
>>> my_dataset.update(name="my-new-dataset")
Successfully renamed dataset with UID 0 to my-new-dataset
Parameters:

name (str, default: '') – The new name for this dataset

Returns:

Confirmation string if this operation was successful

Return type:

str

update_datasource_data(old_datasource_uid, new_data, sync=True)

This function allows you to replace the data of an existing data source with new data. This function can be used if you find an error in an existing value in a data source, or if you need to update values due to changes in your upstream data pipeline. This function requires that all row indexes in the new data source match the row indexes of the old data source. Additionally, all columns must have the same name and the same type.

If your goal is to change the number of columns, the number of rows, or the type of a column, you should consider using an Operator instead.

warning
This is a potentially dangerous operation, and may take a while to run. For safety, this will always run data source checks on the new data source. Applications and models that use the data source being replaced may become temporarily unavailable as computations are re-run over the new data, and might report different behavior. If you are unsure how to use this function, contact a Snorkel representative.

Examples

>>> from snorkelflow.sdk import Dataset
>>> my_dataset = Dataset.get("my-dataset")
>>> my_dataset.datasources
[{"datasource_uid": 1, "datasource_name": "test.csv", "split": "train"}]
>>> df = my_dataset.get_dataframe(datasource_uid=1, max_rows=None)
>>> df
| | a | b | c |
| 0| 1 | 0 | bad_path.pdf|
>>> df.iloc[0, 2] = "good_path.pdf"
>>> my_dataset.update_datasource_data(1, df)
Successfully replaced data in datasource with UID 1.
Parameters:
  • old_datasource_uid (int) – The UID of the data source you want to swap out. You can see a list of all data sources for this dataset by viewing self.datasources.

  • new_data (Union[str, DataFrame]) – Either (1) A path to a file in the local filesystem, or a path to an S3-compatible API, by default None. If filepath is not provided, a DataFrame must be provided instead, or (2) A Pandas DataFrame, by default None. If df is not provided, a filepath must be provided instead. The columns and UIDs of the new data must exactly match that of the data being replaced. Use dataset.get_dataframe(datasource_uid=old_datasource_uid) to see the existing data.

  • sync (bool, default: True) – Poll job status and block until all jobs are complete, by default True

Returns:

Returns a Job ID that can be polled if sync is False. Otherwise returns None

Return type:

Optional[str]

Raises:

ValueError – If the data provided is neither a valid file path or a valid Pandas DataFrame

update_datasource_split(datasource_uid, split)

Change the split of a data source that has already been uploaded to the dataset. This will impact how the data source is used in all future applications.

warning
This will only impact the Dataset’s data source, and not existing derived data sources. To change the split within applications that have already been created, find the node’s derived data source UID by clicking on “Develop” > “View Data Sources” in the Snorkel Flow UI and use the sf.update_datasource function.

Examples

>>> from snorkelflow.sdk import Dataset
>>> my_dataset = Dataset.get("my-dataset")
>>> my_dataset.datasources
[{"datasource_uid": 1, "datasource_name": "test.csv", "split": "train"}]
>>> my_dataset.update_datasource_split(1, "train")
[123, 456, 789]
Parameters:
  • datasource_uid (int) – The integer UID corresponding to the data source you wish to update. You can see a list of all data sources for this dataset by viewing self.datasources.

  • split (str) – The new split to assign to this data source. Must be one of “train”, “test”, or “valid”.

Returns:

Returns a list of model nodes that have been impacted by changing the split.

Return type:

List[int]

property batches: List[Batch]

A list of batches belonging to this Dataset.

property datasources: List[Dict[str, Any]]

A list of data sources and associated metadata belonging to this Dataset.

property label_schemas: List[LabelSchema]

A list of label schemas belonging to this Dataset.

property mta_enabled: bool

Whether or not multi-task annotation is enabled for this dataset.

property name: str

The human-readable name of the dataset.

property uid: int

The unique integer identifier for the dataset within Snorkel Flow.