snorkelflow.sdk.Dataset
- class snorkelflow.sdk.Dataset(name, uid, mta_enabled)
Bases:
object
The Dataset object represents a dataset in Snorkel Flow.
Datasets Quickstart
Datasets Quickstart
In this quickstart, we will create a Dataset and upload a file to that Dataset as a data source. We will then show how you might go about ingesting that data into the platform.
We will need the following imports
from snorkelflow.sdk import Dataset
import snorkelflow.client as sf
import pandas as pd
ctx = sf.SnorkelFlowContext.from_kwargs()We will begin by creating a new Dataset.
>>> contracts_dataset = Dataset.create("contracts-dataset")
Successfully created dataset contracts-dataset with UID 0 in workspace 0Next, we will attempt to save a file to the Dataset as a data source. This file will be in S3. File upload will initially fail because this file contains null values.
>>> contracts_dataset.create_datasource("s3://snorkel-contracts-dataset/dev.parquet", uid_col="uid", split="train")
UserInputError: Errors...In this particular example, we decide we don’t care about these rows, so we can use Pandas to edit the file and remove the null values. We can then re-upload the data, this time uploading the DataFrame directly without needing to save it to a file again. In some other cases, you may want to either edit those null cells or fix them in your upstream data pipeline.
>>> df = pd.read_parquet("s3://snorkel-contracts-dataset/dev.parquet")
>>> df = df.dropna()
>>> contracts_dataset.create_datasource(df, uid_col="uid", split="train")
+0.07s Starting data ingestion
+1.85s Ingesting data
+2.05s Data ingestion completeTo verify that has worked correctly, we can view this Dataset’s data sources.
>>> contracts_dataset.datasources
[{'datasource_uid': 668,...}]Dataset Concepts
Dataset Concepts
Datasets
Datasets
Datasets are how your data is represented in Snorkel Flow. Snorkel Flow projects always begin with a single Dataset. Datasets bring external data into Snorkel Flow and help manage that data once it has been ingested. Datasets are composed of individual chunks of data, called data sources, and provides an interface for managing individual data sources.
Data Sources
Data Sources
Data sources are the individual chunks of data that make up a Dataset. A data source can be a file you upload from local storage, a file located in a remote (S3, MinIO, etc.) storage service, or an in-memory Pandas DataFrame. Data sources shouldn’t be touched directly, but should be managed by interfacing with their parent Dataset. The best way to deal with data sources is to treat them as blocks of data, which can be added and removed but only occasionally changed. Data sources can be given names during their creation, but are usually referred to using a data source UID, an integer ID assigned to each data source when it is created.
Derived Data Sources
Derived Data Sources
When an application is created using a dataset, Snorkel Flow will create a derived data source for each data source in the dataset. Derived data sources are intermediate representations of data that track the lineage of the data as it is being processed and are associated with only one application. Note that some operations, such as changing the split of a data source, don’t propagate to any of the derived data source once they are derived, and vice versa. Derived data sources are viewable in the Snorkel Flow UI on the “View Data Sources” button, accessible from the “Develop” screen of an application.
Modifying Data
Modifying Data
In general, data sources should be treated as immutable. This means that you should avoid modifying the underlying data source once it has been uploaded. If your goal is to filter out rows, add feature columns, or remove feature columns, you should use an Operator to do so. Alternatively, you can modify your data upstream of Snorkel and create a new Dataset with your edited data.
The Python SDK provides limited support for specific one-off operations on data sources. Sometimes you might need to reformat the data in an existing column to make it compatible with processing logic. In this case, you can use the
dataset.update_datasource_data
method to swap out an existing data source for a new one with the updated data. However, be aware that this is an irreversible change, and updating data in this way is an expensive operation that will require all downstream applications to be refreshed.Splits
Splits
Data sources belong to splits. Splits help dictate how the data will be used in the model development process. Data sources allocated to the train split will be used for model training and labeling function development. Data sources allocated to the valid split will be used to validate models iteratively and to perform error analysis. Data sources allocated to the test split will be used to evaluate the final model. Data source splits may be updated as needed, but be aware that model metrics and labeling function performance will change based on how the splits are allocated.
Data Upload Guardrails
Data Upload Guardrails
When you upload data to Snorkel Flow, it must pass a series of safety checks to ensure that the data is valid and safe to load into the platform. These checks include:
Number of rows: A single data source should not exceed 10 million rows. If your data source exceeds this limit, you should split it into multiple data sources before uploading.
Column memory: The average memory usage of a single column must be under 20MB across all columns in your data source. For performance, the average column memory usage should be under 5MB. If your data source exceeds this limit, you should split it into multiple data sources before uploading.
Null values: Snorkel Flow will not permit data to be uploaded if any null values exist in that data source. If you have null values in your data, you might want to clean them up with the Pandas
fillna()
method before uploading.Unique integer index: Snorkel Flow requires that each data source have a unique integer index column. The values in this index must be unique among all datasources in the Dataset. The values must also be unique, non-negative integers. If your Dataset does not already have this stable index column, you must create one before uploading.
Consistent schema: All data sources in a single Dataset should have the same columns. All columns that are in multiple data sources must have the same type. If you have columns that exist in some data sources but not others, you may see unexpected behavior in downstream tasks.
Fetching UIDs
Fetching UIDs
Methods in the
Dataset
class will sometimes require a UID parameter. This is the unique identifier for the Dataset within Snorkel Flow. The Dataset UID can be retrieved by calling.uid
on a Dataset object. Data source methods will sometimes require a data source UID, which can be retrieved by printing out the datasources by callingmy_dataset.datasources
. The data source UID is thedatasource_uid
field in the returned dictionary.- __init__(name, uid, mta_enabled)
Create a dataset object in-memory with necessary properties. This constructor should not be called directly, and should instead be accessed through the
create()
andget()
methodsParameters
Parameters
Name Type Default Info name str
The human-readable name of the dataset. Must be unique within the workspace. uid int
The unique integer identifier for the dataset within Snorkel Flow. mta_enabled bool
Whether or not multi-task annotation is enabled for this dataset.
\_\_init\_\_
__init__
Methods
__init__
(name, uid, mta_enabled)Create a dataset object in-memory with necessary properties.
create
(dataset_name[, enable_mta])Creates and registers a new Dataset object.
create_batches
([name, assignees, ...])Create annotation batches for this dataset.
create_datasource
(data, uid_col[, name, ...])Creates a new data source withing the Dataset from either a filepath or a Pandas DataFrame.
create_label_schema
(name, data_type, ...[, ...])Create a label schema associated with this dataset.
delete
(dataset[, force])Delete a dataset based on the provided identifier
delete_datasource
(datasource_uid[, force, sync])Delete a data source.
get
(dataset)Fetches an already-created Dataset from Snorkel Flow and returns a Dataset object that can be used to interact with files and data
get_dataframe
([split, max_rows, ...])Read the Dataset's data into an in-memory Pandas DataFrame.
list
()Get a list of all Datasets.
update
([name])Update the metadata for this dataset.
update_datasource_data
(old_datasource_uid, ...)This function allows you to replace the data of an existing data source with new data.
update_datasource_split
(datasource_uid, split)Change the split of a data source that has already been uploaded to the dataset.
Attributes
A list of batches belonging to this Dataset.
A list of data sources and associated metadata belonging to this Dataset.
A list of label schemas belonging to this Dataset.
Whether or not multi-task annotation is enabled for this dataset.
The human-readable name of the dataset.
The unique integer identifier for the dataset within Snorkel Flow.
- classmethod create(dataset_name, enable_mta=True)
Creates and registers a new Dataset object. A Dataset object organizes and collects files and other sources of data for use in Snorkel Flow. A Dataset is restricted to a particular workspace, so only users in that workspace will be able to access that Dataset. Datasets must be initialized with a unique name
Examples
>>> from snorkelflow.sdk import Dataset
>>> my_dataset = Dataset.create(dataset_name="my-dataset")
Successfully created dataset my-dataset with UID 0 in workspace 0Parameters
Parameters
Returns
Returns
A Dataset object that can be used to interact with the dataset in Snorkel Flow
Return type
Return type
Name Type Default Info dataset_name str
A name for the Dataset. This name must be unique within the workspace. enable_mta bool
True
Whether to enable multi-task annotation for this dataset. Enabled by default.
create
create
- create_batches(name=None, assignees=None, label_schemas=None, batch_size=None, num_batches=None, randomize=False, random_seed=123, selection_strategy=None, split=None, x_uids=None, filter_by_x_uids_not_in_batch=False, divide_x_uids_evenly_to_assignees=False)
Create annotation batches for this dataset.
This is the recommended entrypoint for creating batches.
Parameters
Parameters
Returns
Returns
The list of created batches
Return type
Return type
List[Batch]
Name Type Default Info name Optional[str]
None
The name of the batch. assignees Optional[List[int]]
None
The user UIDs for the assignees of the batches. label_schemas Optional[List[LabelSchema]]
None
The label schemas assigned for the batches. batch_size Optional[int]
None
The size of the batches. num_batches Optional[int]
None
The number of batches. randomize Optional[bool]
False
Whether to randomize the batches. random_seed Optional[int]
123
The seed for the randomization. selection_strategy Optional[SelectionStrategy]
None
The SelectionStrategy for the batches. split Optional[str]
None
The split (“train”, “test”, or “valid”) of the batches. x_uids Optional[List[str]]
None
A list of datapoint uids to create batches from. filter_by_x_uids_not_in_batch Optional[bool]
False
Whether to create batches with datapoints not in a batch. divide_x_uids_evenly_to_assignees Optional[bool]
False
Whether to divide the datapoints evenly among the provided assignees.
create\_batches
create_batches
- create_datasource(data, uid_col, name=None, split='train', sync=True, run_checks=True)
Creates a new data source withing the Dataset from either a filepath or a Pandas DataFrame.
If you provide a filepath: A file can be a CSV or Parquet file that either exists in the local filesystem, or is accessible via an S3-compatible API (such as MinIO, or AWS S3). Files must have a stable integer index column that is unique across all data sources in the dataset.
If you provide a DataFrame: The DataFrame must have a unique integer column that does not contain duplicates across other sources of data. All DataFrame column names must be strings.
The data must pass all validation checks to be registered as a valid data source. If a DataFrame fails to pass all data validation checks, the upload will fail and the data source will not be registered.
Examples
>>> from snorkelflow.sdk import Dataset
>>> my_dataset = Dataset.get("my-dataset")
>>> my_dataset.create_datasource("my_data.csv", uid_col="id", split="train")
+0.07s Starting data ingestion
+1.85s Ingesting data
+2.05s Data ingestion complete
1 # UID of the datasource
>>> my_dataset.create_datasource(df, uid_col="id", name="dataframe-data", split="train")
+0.07s Starting data ingestion
+1.85s Ingesting data
+2.05s Data ingestion complete
1Parameters
Parameters
Returns
Returns
If sync is True, returns the integer UID of the datasource. If sync is False, returns a job ID that can be monitored with
sf.poll_job_id
Return type
Return type
Union[str, int]
Name Type Default Info data Union[str, DataFrame]
Either: - A path to a file in the local filesystem, or a path to an S3-compatible API, by default None. If filepath is not provided, a DataFrame must be provided instead - A Pandas DataFrame, by default None. If df is not provided, a filepath must be provided instead. uid_col str
Name of the UID column for this data. The values in this column must be unique non-negative integers that are not duplicated across files. name Optional[str]
None
The name to give this data source. If not provided, the name of the file will be used, by default None. Adding a name is strongly recommended when uploading a DataFrame. split Optional[str]
'train'
The name of the data split this data belongs to, by default Splits.train. sync Optional[bool]
True
Whether execution should be blocked by this function, by default True. Note that Dataset().datasources may not be updated immediately if sync=False. run_checks bool
True
Whether we should run datasource checks. Recommended for safety, by default True.
create\_datasource
create_datasource
- create_label_schema(name, data_type, task_type, label_map, multi_label=False, description=None, label_column=None, label_descriptions=None, primary_field=None)
Create a label schema associated with this dataset.
This is the recommended entrypoint for creating label schemas.
Parameters
Parameters
Returns
Returns
The label schema object
Return type
Return type
Name Type Default Info name str
The name of the label schema. data_type str
The data type of the label schema. task_type str
The task type of the label schema. label_map Union[Dict[str, int], List[str]]
A dictionary mapping label names to their integer values, or a list of label names. multi_label bool
False
Whether the label schema is a multi-label schema, by default False. description Optional[str]
None
A description of the label schema, by default None. label_column Optional[str]
None
The name of the column that contains the labels, by default None. label_descriptions Optional[Dict[str, str]]
None
A dictionary mapping label names to their descriptions, by default None. primary_field Optional[str]
None
The primary field of the label schema, by default None.
create\_label\_schema
create_label_schema
- classmethod delete(dataset, force=False)
Delete a dataset based on the provided identifier
The operation will fail if any applications use this Dataset
Examples
>>> from snorkelflow.sdk import Dataset
>>> Dataset.delete("my-dataset")
Successfully deleted dataset my-dataset with UID 0.
delete
delete
- delete_datasource(datasource_uid, force=False, sync=True)
Delete a data source. Calling delete_datasource will fully remove the data source from the dataset.
warningThe operation will not be permitted if any applications are using the data source to avoid breaking downstream applications. If you are sure you want to delete the data source, use the flagforce=True
to override this check. This function may take a while.Examples
>>> from snorkelflow.sdk import Dataset
>>> my_dataset = Dataset.get("my-dataset")
>>> my_dataset.delete_datasource(1)
Successfully deleted datasource with UID 1.Parameters
Parameters
Returns
Returns
Optionally returns job_id if sync mode is turned off
Return type
Return type
Optional[str]
Name Type Default Info datasource_uid int
UID of the data source to delete. See all datasources for this dataset by viewing self.datasources. force bool
False
boolean allowing one to force deletion of a datasource even if that datasource has dependent assets (ground truth, annotations, etc), by default false. sync bool
True
Poll job status and block until complete, by default true.
delete\_datasource
delete_datasource
- classmethod get(dataset)
Fetches an already-created Dataset from Snorkel Flow and returns a Dataset object that can be used to interact with files and data
Examples
>>> from snorkelflow.sdk import Dataset
>>> my_dataset = Dataset.get("my-dataset")
Successfully retrieved dataset my-dataset with UID 0 in workspace 0.
get
get
- get_dataframe(split=None, max_rows=10, target_columns=None, datasource_uid=None, use_source_index=True)
Read the Dataset’s data into an in-memory Pandas DataFrame. If only a subset of columns are required, they can be specified with
target_columns
. Note that changes to the DataFrame will not be reflected in the Dataset. To change the actual data in the dataset, you must swap out the relevant data sources.noteBy default, only 10 rows are read for memory safety. This limit can be increased by settingmax_rows
to a larger value, but this can be computationally intensive and may lead to unstable behavior.noteBy default, we will return the original index column name the data source was uploaded with. However, certain SDK workflows might require an internal representation of the index column, such as thesnorkelflow.sdk.Deployment.execute
function. If you run into issues because of this, rundataset.get_dataframe
with theuse_source_index
parameter set toFalse
.Examples
>>> from snorkelflow.sdk import Dataset
>>> my_dataset = Dataset.get("my-dataset")
>>> df = my_dataset.get_dataframe(target_columns=["a", "b"])
<pd.DataFrame object with 10 rows and columns a, b>
>>> df = my_dataset.get_dataframe(datasource_uid=0, max_rows=None)
<pd.DataFrame object with 100 rows and columns a, b, c>Parameters
Parameters
Returns
Returns
A Pandas DataFrame object displaying the data in this dataset
Return type
Return type
pd.DataFrame
Name Type Default Info split Optional[str]
None
The data split to load, by default None (all splits). Other options are “train”, “valid”, and “test”. max_rows Optional[int]
10
The maximum number of rows to read, by default 10. Use max_rows=None
to fetch all rows. Warning: setting this to a large value can be computationally intensive and may lead to unstable behavior.target_columns Optional[List[str]]
None
A list of desired data columns, in case not all columns are required, by default None. datasource_uid Optional[int]
None
Fetch a dataframe from a particular datasource_uid
. A list of all datasource UIDs can be retrieved withDataset().datasources
use_source_index bool
True
If true, returns the index column that the data source was originally uploaded with. If false, returns the Snorkel Flow internal column name. True by default.
get\_dataframe
get_dataframe
- static list()
Get a list of all Datasets. The returned list includes the Dataset UID, the Dataset name, and additional metadata used to keep track of the Dataset’s properties.
Examples
>>> Dataset.list()
[
{
"name": "test-csv-str",
"uid": 116,
"datasources": []
},
...
]
list
list
- update(name='')
Update the metadata for this dataset. Only updating the name of this Dataset is currently supported. The new name for the dataset must be unique within the workspace.
Examples
>>> from snorkelflow.sdk import Dataset
>>> my_dataset = Dataset.get(dataset="my-dataset")
>>> my_dataset.update(name="my-new-dataset")
Successfully renamed dataset with UID 0 to my-new-dataset
update
update
- update_datasource_data(old_datasource_uid, new_data, sync=True)
This function allows you to replace the data of an existing data source with new data. This function can be used if you find an error in an existing value in a data source, or if you need to update values due to changes in your upstream data pipeline. This function requires that all row indexes in the new data source match the row indexes of the old data source. Additionally, all columns must have the same name and the same type.
If your goal is to change the number of columns, the number of rows, or the type of a column, you should consider using an Operator instead.
warningThis is a potentially dangerous operation, and may take a while to run. For safety, this will always run data source checks on the new data source. Applications and models that use the data source being replaced may become temporarily unavailable as computations are re-run over the new data, and might report different behavior. If you are unsure how to use this function, contact a Snorkel representative.Examples
>>> from snorkelflow.sdk import Dataset
>>> my_dataset = Dataset.get("my-dataset")
>>> my_dataset.datasources
[{"datasource_uid": 1, "datasource_name": "test.csv", "split": "train"}]
>>> df = my_dataset.get_dataframe(datasource_uid=1, max_rows=None)
>>> df
| | a | b | c |
| 0| 1 | 0 | bad_path.pdf|
>>> df.iloc[0, 2] = "good_path.pdf"
>>> my_dataset.update_datasource_data(1, df)
Successfully replaced data in datasource with UID 1.Parameters
Parameters
Returns
Returns
Returns a Job ID that can be polled if sync is False. Otherwise returns None
Return type
Return type
Optional[str]
Raises
Raises
ValueError – If the data provided is neither a valid file path or a valid Pandas DataFrame
Name Type Default Info old_datasource_uid int
The UID of the data source you want to swap out. You can see a list of all data sources for this dataset by viewing self.datasources. new_data Union[str, DataFrame]
Either (1) A path to a file in the local filesystem, or a path to an S3-compatible API, by default None. If filepath is not provided, a DataFrame must be provided instead, or (2) A Pandas DataFrame, by default None. If df is not provided, a filepath must be provided instead. The columns and UIDs of the new data must exactly match that of the data being replaced. Use dataset.get_dataframe(datasource_uid=old_datasource_uid)
to see the existing data.sync bool
True
Poll job status and block until all jobs are complete, by default True.
update\_datasource\_data
update_datasource_data
- update_datasource_split(datasource_uid, split)
Change the split of a data source that has already been uploaded to the dataset. This will impact how the data source is used in all future applications.
warningThis will only impact the Dataset’s data source, and not existing derived data sources. To change the split within applications that have already been created, find the node’s derived data source UID by clicking on “Develop” > “View Data Sources” in the Snorkel Flow UI and use thesf.update_datasource
function.Examples
>>> from snorkelflow.sdk import Dataset
>>> my_dataset = Dataset.get("my-dataset")
>>> my_dataset.datasources
[{"datasource_uid": 1, "datasource_name": "test.csv", "split": "train"}]
>>> my_dataset.update_datasource_split(1, "train")
[123, 456, 789]Parameters
Parameters
Returns
Returns
Returns a list of model nodes that have been impacted by changing the split.
Return type
Return type
List[int]
Name Type Default Info datasource_uid int
The integer UID corresponding to the data source you wish to update. You can see a list of all data sources for this dataset by viewing self.datasources. split str
The new split to assign to this data source. Must be one of “train”, “test”, or “valid”.
update\_datasource\_split
update_datasource_split
- property batches: List[Batch]
A list of batches belonging to this Dataset.
- property datasources: List[Dict[str, Any]]
A list of data sources and associated metadata belonging to this Dataset.
- property label_schemas: List[LabelSchema]
A list of label schemas belonging to this Dataset.
- property mta_enabled: bool
Whether or not multi-task annotation is enabled for this dataset.
- property name: str
The human-readable name of the dataset.
- property uid: int
The unique integer identifier for the dataset within Snorkel Flow.