snorkelflow.sdk.Dataset
- class snorkelflow.sdk.Dataset(name, uid, mta_enabled)
Bases:
object
The Dataset object represents a dataset in Snorkel Flow.
Datasets Quickstart
Datasets Quickstart
In this quickstart, we will create a Dataset and upload a file to that Dataset as a data source. We will then show how you might go about ingesting that data into the platform.
We will need the following imports
from snorkelflow.sdk import Dataset
import snorkelflow.client as sf
import pandas as pd
ctx = sf.SnorkelFlowContext.from_kwargs()We will begin by creating a new Dataset.
>>> contracts_dataset = Dataset.create("contracts-dataset")
Successfully created dataset contracts-dataset with UID 0 in workspace 0Next, we will attempt to save a file to the Dataset as a data source. This file will be in S3. File upload will initially fail because this file contains null values.
>>> contracts_dataset.create_datasource("s3://snorkel-contracts-dataset/dev.parquet", uid_col="uid", split="train")
UserInputError: Errors...In this particular example, we decide we don’t care about these rows, so we can use Pandas to edit the file and remove the null values. We can then re-upload the data, this time uploading the DataFrame directly without needing to save it to a file again. In some other cases, you may want to either edit those null cells or fix them in your upstream data pipeline.
>>> df = pd.read_parquet("s3://snorkel-contracts-dataset/dev.parquet")
>>> df = df.dropna()
>>> contracts_dataset.create_datasource(df, uid_col="uid", split="train")
+0.07s Starting data ingestion
+1.85s Ingesting data
+2.05s Data ingestion completeTo verify that has worked correctly, we can view this Dataset’s data sources.
>>> contracts_dataset.datasources
[{'datasource_uid': 668,...}]Dataset Concepts
Dataset Concepts
Datasets
Datasets
Datasets are how your data is represented in Snorkel Flow. Snorkel Flow projects always begin with a single Dataset. Datasets bring external data into Snorkel Flow and help manage that data once it has been ingested. Datasets are composed of individual chunks of data, called data sources, and provides an interface for managing individual data sources.
Data Sources
Data Sources
Data sources are the individual chunks of data that make up a Dataset. A data source can be a file you upload from local storage, a file located in a remote (S3, MinIO, etc.) storage service, or an in-memory Pandas DataFrame. Data sources shouldn’t be touched directly, but should be managed by interfacing with their parent Dataset. The best way to deal with data sources is to treat them as blocks of data, which can be added and removed but only occasionally changed. Data sources can be given names during their creation, but are usually referred to using a data source UID, an integer ID assigned to each data source when it is created.
Derived Data Sources
Derived Data Sources
When an application is created using a dataset, Snorkel Flow will create a derived data source for each data source in the dataset. Derived data sources are intermediate representations of data that track the lineage of the data as it is being processed and are associated with only one application. Note that some operations, such as changing the split of a data source, don’t propagate to any of the derived data source once they are derived, and vice versa. Derived data sources are viewable in the Snorkel Flow UI on the “View Data Sources” button, accessible from the “Develop” screen of an application.
Modifying Data
Modifying Data
In general, data sources should be treated as immutable. This means that you should avoid modifying the underlying data source once it has been uploaded. If your goal is to filter out rows, add feature columns, or remove feature columns, you should use an Operator to do so. Alternatively, you can modify your data upstream of Snorkel and create a new Dataset with your edited data.
The Python SDK provides limited support for specific one-off operations on data sources. Sometimes you might need to reformat the data in an existing column to make it compatible with processing logic. In this case, you can use the
dataset.update_datasource_data
method to swap out an existing data source for a new one with the updated data. However, be aware that this is an irreversible change, and updating data in this way is an expensive operation that will require all downstream applications to be refreshed.Splits
Splits
Data sources belong to splits. Splits help dictate how the data will be used in the model development process. Data sources allocated to the train split will be used for model training and labeling function development. Data sources allocated to the valid split will be used to validate models iteratively and to perform error analysis. Data sources allocated to the test split will be used to evaluate the final model. Data source splits may be updated as needed, but be aware that model metrics and labeling function performance will change based on how the splits are allocated.
Data Upload Guardrails
Data Upload Guardrails
When you upload data to Snorkel Flow, it must pass a series of safety checks to ensure that the data is valid and safe to load into the platform. These checks include:
Number of rows: A single data source should not exceed 10 million rows. If your data source exceeds this limit, you should split it into multiple data sources before uploading.
Column memory: The average memory usage of a single column must be under 20MB across all columns in your data source. For performance, the average column memory usage should be under 5MB. If your data source exceeds this limit, you should split it into multiple data sources before uploading.
Null values: Snorkel Flow will not permit data to be uploaded if any null values exist in that data source. If you have null values in your data, you might want to clean them up with the Pandas
fillna()
method before uploading.Unique integer index: Snorkel Flow requires that each data source have a unique integer index column. The values in this index must be unique among all datasources in the Dataset. The values must also be unique, non-negative integers. If your Dataset does not already have this stable index column, you must create one before uploading.
Consistent schema: All data sources in a single Dataset should have the same columns. All columns that are in multiple data sources must have the same type. If you have columns that exist in some data sources but not others, you may see unexpected behavior in downstream tasks.
Fetching UIDs
Fetching UIDs
Methods in the
Dataset
class will sometimes require a UID parameter. This is the unique identifier for the Dataset within Snorkel Flow. The Dataset UID can be retrieved by calling.uid
on a Dataset object. Data source methods will sometimes require a data source UID, which can be retrieved by printing out the datasources by callingmy_dataset.datasources
. The data source UID is thedatasource_uid
field in the returned dictionary.- __init__(name, uid, mta_enabled)
Create a dataset object in-memory with necessary properties. This constructor should not be called directly, and should instead be accessed through the
create()
andget()
methods- Parameters:
name (
str
) – The human-readable name of the dataset. Must be unique within the workspaceuid (
int
) – The unique integer identifier for the dataset within Snorkel Flowmta_enabled (
bool
) – Whether or not multi-task annotation is enabled for this dataset
Methods
__init__
(name, uid, mta_enabled)Create a dataset object in-memory with necessary properties.
create
(dataset_name[, enable_mta])Creates and registers a new Dataset object.
create_batches
([name, assignees, ...])Create annotation batches for this dataset.
create_datasource
(data, uid_col[, name, ...])Creates a new data source withing the Dataset from either a filepath or a Pandas DataFrame.
create_label_schema
(name, data_type, ...[, ...])Create a label schema associated with this dataset.
delete
(dataset[, force])Delete a dataset based on the provided identifier
delete_datasource
(datasource_uid[, force, sync])Delete a data source.
get
(dataset)Fetches an already-created Dataset from Snorkel Flow and returns a Dataset object that can be used to interact with files and data
get_dataframe
([split, max_rows, ...])Read the Dataset's data into an in-memory Pandas DataFrame.
list
()Get a list of all Datasets.
update
([name])Update the metadata for this dataset.
update_datasource_data
(old_datasource_uid, ...)This function allows you to replace the data of an existing data source with new data.
update_datasource_split
(datasource_uid, split)Change the split of a data source that has already been uploaded to the dataset.
Attributes
A list of batches belonging to this Dataset.
A list of data sources and associated metadata belonging to this Dataset.
A list of label schemas belonging to this Dataset.
Whether or not multi-task annotation is enabled for this dataset.
The human-readable name of the dataset.
The unique integer identifier for the dataset within Snorkel Flow.
- classmethod create(dataset_name, enable_mta=False)
Creates and registers a new Dataset object. A Dataset object organizes and collects files and other sources of data for use in Snorkel Flow. A Dataset is restricted to a particular workspace, so only users in that workspace will be able to access that Dataset. Datasets must be initialized with a unique name
Examples
>>> from snorkelflow.sdk import Dataset
>>> my_dataset = Dataset.create(dataset_name="my-dataset")
Successfully created dataset my-dataset with UID 0 in workspace 0- Parameters:
dataset_name (
str
) – A name for the Dataset. This name must be unique within the workspaceenable_mta (
bool
, default:False
) – If True, enable multi-task annotation for this dataset.
- Returns:
A Dataset object that can be used to interact with the dataset in Snorkel Flow
- Return type:
- create_batches(name=None, assignees=None, label_schemas=None, batch_size=None, num_batches=None, randomize=False, random_seed=123, selection_strategy=None, split=None, x_uids=None, filter_by_x_uids_not_in_batch=False, divide_x_uids_evenly_to_assignees=False)
Create annotation batches for this dataset.
This is the recommended entrypoint for creating batches.
- Parameters:
name (
Optional
[str
], default:None
) – The name of the batchassignees (
Optional
[List
[int
]], default:None
) – The user UIDs for the assignees of the batcheslabel_schemas (
Optional
[List
[LabelSchema
]], default:None
) – The label schemas assigned for the batchesbatch_size (
Optional
[int
], default:None
) – The size of the batchesnum_batches (
Optional
[int
], default:None
) – The number of batchesrandomize (
Optional
[bool
], default:False
) – Whether to randomize the batchesrandom_seed (
Optional
[int
], default:123
) – The seed for the randomizationselection_strategy (
Optional
[SelectionStrategy
], default:None
) – The SelectionStrategy for the batchessplit (
Optional
[str
], default:None
) – The split (“train”, “test”, or “valid”) of the batchesx_uids (
Optional
[List
[str
]], default:None
) – A list of datapoint uids to create batches fromfilter_by_x_uids_not_in_batch (
Optional
[bool
], default:False
) – Whether to create batches with datapoints not in a batchdivide_x_uids_evenly_to_assignees (
Optional
[bool
], default:False
) – Whether to divide the datapoints evenly among the provided assignees
- Returns:
The list of created batches
- Return type:
List[Batch]
- create_datasource(data, uid_col, name=None, split='train', sync=True, run_checks=True)
Creates a new data source withing the Dataset from either a filepath or a Pandas DataFrame.
If you provide a filepath: A file can be a CSV or Parquet file that either exists in the local filesystem, or is accessible via an S3-compatible API (such as MinIO, or AWS S3). Files must have a stable integer index column that is unique across all data sources in the dataset.
If you provide a DataFrame: The DataFrame must have a unique integer column that does not contain duplicates across other sources of data. All DataFrame column names must be strings.
The data must pass all validation checks to be registered as a valid data source. If a DataFrame fails to pass all data validation checks, the upload will fail and the data source will not be registered.
Examples
>>> from snorkelflow.sdk import Dataset
>>> my_dataset = Dataset.get("my-dataset")
>>> my_dataset.create_datasource("my_data.csv", uid_col="id", split="train")
+0.07s Starting data ingestion
+1.85s Ingesting data
+2.05s Data ingestion complete
1 # UID of the datasource
>>> my_dataset.create_datasource(df, uid_col="id", name="dataframe-data", split="train")
+0.07s Starting data ingestion
+1.85s Ingesting data
+2.05s Data ingestion complete
1- Parameters:
data (
Union
[str
,DataFrame
]) – Either: - A path to a file in the local filesystem, or a path to an S3-compatible API, by default None. If filepath is not provided, a DataFrame must be provided instead - A Pandas DataFrame, by default None. If df is not provided, a filepath must be provided insteaduid_col (
str
) – Name of the UID column for this data. The values in this column must be unique non-negative integers that are not duplicated across filesname (
Optional
[str
], default:None
) – The name to give this data source. If not provided, the name of the file will be used, by default None. Adding a name is strongly recommended when uploading a DataFramesplit (
Optional
[str
], default:'train'
) – The name of the data split this data belongs to, by default Splits.trainsync (
Optional
[bool
], default:True
) – Whether execution should be blocked by this function, by default True. Note that Dataset().datasources may not be updated immediately if sync=Falserun_checks (
bool
, default:True
) – Whether we should run datasource checks. Recommended for safety, by default True
- Returns:
If sync is True, returns the integer UID of the datasource. If sync is False, returns a job ID that can be monitored with
sf.poll_job_id
- Return type:
Union[str, int]
- create_label_schema(name, data_type, task_type, label_map, multi_label=False, description=None, label_column=None, label_descriptions=None, primary_field=None)
Create a label schema associated with this dataset.
This is the recommended entrypoint for creating label schemas.
- Parameters:
name (
str
) – The name of the label schemadata_type (
str
) – The data type of the label schematask_type (
str
) – The task type of the label schemalabel_map (
Union
[Dict
[str
,int
],List
[str
]]) – A dictionary mapping label names to their integer values, or a list of label namesmulti_label (
bool
, default:False
) – Whether the label schema is a multi-label schema, by default Falsedescription (
Optional
[str
], default:None
) – A description of the label schema, by default Nonelabel_column (
Optional
[str
], default:None
) – The name of the column that contains the labels, by default Nonelabel_descriptions (
Optional
[Dict
[str
,str
]], default:None
) – A dictionary mapping label names to their descriptions, by default Noneprimary_field (
Optional
[str
], default:None
) – The primary field of the label schema, by default None
- Returns:
The label schema object
- Return type:
- classmethod delete(dataset, force=False)
Delete a dataset based on the provided identifier
The operation will fail if any applications use this Dataset
Examples
>>> from snorkelflow.sdk import Dataset
>>> Dataset.delete("my-dataset")
Successfully deleted dataset my-dataset with UID 0.- Parameters:
dataset (
Union
[str
,int
]) – Name or UID of the dataset to deleteforce (
bool
, default:False
) – If True, delete any applications using the Dataset as well
- Return type:
None
- delete_datasource(datasource_uid, force=False, sync=True)
Delete a data source. Calling delete_datasource will fully remove the data source from the dataset.
warningThe operation will not be permitted if any applications are using the data source to avoid breaking downstream applications. If you are sure you want to delete the data source, use the flagforce=True
to override this check. This function may take a while.Examples
>>> from snorkelflow.sdk import Dataset
>>> my_dataset = Dataset.get("my-dataset")
>>> my_dataset.delete_datasource(1)
Successfully deleted datasource with UID 1.- Parameters:
datasource_uid (
int
) – UID of the data source to delete. See all datasources for this dataset by viewing self.datasources.force (
bool
, default:False
) – boolean allowing one to force deletion of a datasource even if that datasource has dependent assets (ground truth, annotations, etc), by default falsesync (
bool
, default:True
) – Poll job status and block until complete, by default true
- Returns:
Optionally returns job_id if sync mode is turned off
- Return type:
Optional[str]
- classmethod get(dataset)
Fetches an already-created Dataset from Snorkel Flow and returns a Dataset object that can be used to interact with files and data
Examples
>>> from snorkelflow.sdk import Dataset
>>> my_dataset = Dataset.get("my-dataset")
Successfully retrieved dataset my-dataset with UID 0 in workspace 0.- Parameters:
dataset (
Union
[str
,int
]) – Either the name or UID of the dataset. A list of all accessible datasets can be retrieved withDataset.list()
- Returns:
A Dataset object that can be used to interact with files and data in Snorkel Flow.
- Return type:
- get_dataframe(split=None, max_rows=10, target_columns=None, datasource_uid=None, use_source_index=True)
Read the Dataset’s data into an in-memory Pandas DataFrame. If only a subset of columns are required, they can be specified with
target_columns
. Note that changes to the DataFrame will not be reflected in the Dataset. To change the actual data in the dataset, you must swap out the relevant data sources.noteBy default, only 10 rows are read for memory safety. This limit can be increased by settingmax_rows
to a larger value, but this can be computationally intensive and may lead to unstable behavior.noteBy default, we will return the original index column name the data source was uploaded with. However, certain SDK workflows might require an internal representation of the index column, such as thesnorkelflow.sdk.Deployment.execute
function. If you run into issues because of this, rundataset.get_dataframe
with theuse_source_index
parameter set toFalse
.Examples
>>> from snorkelflow.sdk import Dataset
>>> my_dataset = Dataset.get("my-dataset")
>>> df = my_dataset.get_dataframe(target_columns=["a", "b"])
<pd.DataFrame object with 10 rows and columns a, b>
>>> df = my_dataset.get_dataframe(datasource_uid=0, max_rows=None)
<pd.DataFrame object with 100 rows and columns a, b, c>- Parameters:
split (
Optional
[str
], default:None
) – The data split to load, by default None (all splits). Other options are “train”, “valid”, and “test”.max_rows (
Optional
[int
], default:10
) – The maximum number of rows to read, by default 10. Usemax_rows=None
to fetch all rows. Warning: setting this to a large value can be computationally intensive and may lead to unstable behaviortarget_columns (
Optional
[List
[str
]], default:None
) – A list of desired data columns, in case not all columns are required, by default Nonedatasource_uid (
Optional
[int
], default:None
) – Fetch a dataframe from a particulardatasource_uid
. A list of all datasource UIDs can be retrieved withDataset().datasources
use_source_index (
bool
, default:True
) – If true, returns the index column that the data source was originally uploaded with. If false, returns the Snorkel Flow internal column name. True by default.
- Returns:
A Pandas DataFrame object displaying the data in this dataset
- Return type:
pd.DataFrame
- static list()
Get a list of all Datasets. The returned list includes the Dataset UID, the Dataset name, and additional metadata used to keep track of the Dataset’s properties.
Examples
>>> Dataset.list()
[
{
"name": "test-csv-str",
"uid": 116,
"datasources": []
},
...
]- Returns:
List of all dataset objects
- Return type:
List[Dataset]
- update(name='')
Update the metadata for this dataset. Only updating the name of this Dataset is currently supported. The new name for the dataset must be unique within the workspace.
Examples
>>> from snorkelflow.sdk import Dataset
>>> my_dataset = Dataset.get(dataset="my-dataset")
>>> my_dataset.update(name="my-new-dataset")
Successfully renamed dataset with UID 0 to my-new-dataset- Parameters:
name (
str
, default:''
) – The new name for this dataset- Returns:
Confirmation string if this operation was successful
- Return type:
str
- update_datasource_data(old_datasource_uid, new_data, sync=True)
This function allows you to replace the data of an existing data source with new data. This function can be used if you find an error in an existing value in a data source, or if you need to update values due to changes in your upstream data pipeline. This function requires that all row indexes in the new data source match the row indexes of the old data source. Additionally, all columns must have the same name and the same type.
If your goal is to change the number of columns, the number of rows, or the type of a column, you should consider using an Operator instead.
warningThis is a potentially dangerous operation, and may take a while to run. For safety, this will always run data source checks on the new data source. Applications and models that use the data source being replaced may become temporarily unavailable as computations are re-run over the new data, and might report different behavior. If you are unsure how to use this function, contact a Snorkel representative.Examples
>>> from snorkelflow.sdk import Dataset
>>> my_dataset = Dataset.get("my-dataset")
>>> my_dataset.datasources
[{"datasource_uid": 1, "datasource_name": "test.csv", "split": "train"}]
>>> df = my_dataset.get_dataframe(datasource_uid=1, max_rows=None)
>>> df
| | a | b | c |
| 0| 1 | 0 | bad_path.pdf|
>>> df.iloc[0, 2] = "good_path.pdf"
>>> my_dataset.update_datasource_data(1, df)
Successfully replaced data in datasource with UID 1.- Parameters:
old_datasource_uid (
int
) – The UID of the data source you want to swap out. You can see a list of all data sources for this dataset by viewing self.datasources.new_data (
Union
[str
,DataFrame
]) – Either (1) A path to a file in the local filesystem, or a path to an S3-compatible API, by default None. If filepath is not provided, a DataFrame must be provided instead, or (2) A Pandas DataFrame, by default None. If df is not provided, a filepath must be provided instead. The columns and UIDs of the new data must exactly match that of the data being replaced. Usedataset.get_dataframe(datasource_uid=old_datasource_uid)
to see the existing data.sync (
bool
, default:True
) – Poll job status and block until all jobs are complete, by default True
- Returns:
Returns a Job ID that can be polled if sync is False. Otherwise returns None
- Return type:
Optional[str]
- Raises:
ValueError – If the data provided is neither a valid file path or a valid Pandas DataFrame
- update_datasource_split(datasource_uid, split)
Change the split of a data source that has already been uploaded to the dataset. This will impact how the data source is used in all future applications.
warningThis will only impact the Dataset’s data source, and not existing derived data sources. To change the split within applications that have already been created, find the node’s derived data source UID by clicking on “Develop” > “View Data Sources” in the Snorkel Flow UI and use thesf.update_datasource
function.Examples
>>> from snorkelflow.sdk import Dataset
>>> my_dataset = Dataset.get("my-dataset")
>>> my_dataset.datasources
[{"datasource_uid": 1, "datasource_name": "test.csv", "split": "train"}]
>>> my_dataset.update_datasource_split(1, "train")
[123, 456, 789]- Parameters:
datasource_uid (
int
) – The integer UID corresponding to the data source you wish to update. You can see a list of all data sources for this dataset by viewing self.datasources.split (
str
) – The new split to assign to this data source. Must be one of “train”, “test”, or “valid”.
- Returns:
Returns a list of model nodes that have been impacted by changing the split.
- Return type:
List[int]
- property batches: List[Batch]
A list of batches belonging to this Dataset.
- property datasources: List[Dict[str, Any]]
A list of data sources and associated metadata belonging to this Dataset.
- property label_schemas: List[LabelSchema]
A list of label schemas belonging to this Dataset.
- property mta_enabled: bool
Whether or not multi-task annotation is enabled for this dataset.
- property name: str
The human-readable name of the dataset.
- property uid: int
The unique integer identifier for the dataset within Snorkel Flow.