Skip to main content
Version: 0.91

Upload data to MinIO

Overview

MinIO is an object store that is compatible with S3. A MinIO API and GUI are shipped with Snorkel Flow for data management across the platform.

To access the GUI, you can click on your profile at the bottom of the sidebar menu, then Resources, > MinIO Object Storage. Once logged in, you can upload and manage your files. We’ll focus our discussion on programmatic interactions with MinIO with the platform throughout the rest of this article.

Interactions with MinIO

In-platform notebook

Authentication to the Snorkel Flow MinIO is handled transparently in the notebook environment, if you’re accessing MinIO outside, you will need to set the following environment variables MINIO_URL, MINIO_ACCESS_KEY, and MINIO_SECRET_KEY.

To open any file, you can use our SDK function called snorkelflow.utils.file.open_file, which will return a file-like object. open_file works with both MinIO and local paths.

Example

from snorkelflow.utils.file import open_file
with open_file("minio://bucket/path/to/some/file", mode="r") as f:
data = f.read()

Custom labeling functions and custom operator Class

You can leverage files in MinIO as resources to use in custom Operators and labeling functions. This can be useful if you want to save computationally expensive outputs to a cached file, e.g., Hugging Face model outputs.

Labeling function example
from snorkelflow.studio import resources_fn_labeling_function
def get_minio_file():
from snorkelflow.utils.file import open_file
import json
ext_file = "minio://bucket/path/to/some/resource.json"
with open_file(ext_file, mode='r') as f:
ext_resource = json.load(f)
return {"ext_resource": ext_resource}

@resources_fn_labeling_function(name="sample_code_lf", resources_fn=get_minio_file)
def lf(x, ext_resource):
if x in ext_resource:
return "LABEL"
return "UNKNOWN
sf.add_code_lf(node, lf, label="LABEL")
Custom operator class example

The opening of the MinIO path should happen in the _compute_features method.

from typing import Any, Dict, Optional
from snorkelflow.operators.featurizer import Featurizer, OpProgressCallback

class CustomFeaturizer(Featurizer):
"""Preprocessor that retrieve model predictions from a file"""

def __init__(self, file_path: str):
self.file_path = file_path

import pandas as pd
from snorkelflow.utils.file import open_file
self.saved_model_preds_dict: Optional[Dict] = None
self.model_pred_df: Optional[pd.DataFrame] = None

@property
def input_schema(self):
return {}

@property
def output_schema(self):
return {"ext_model_predictions": str}
def no_op_progress_callback(*args: Any, **kwargs: Any) -> None:
pass
def _compute_features(self, df: pd.DataFrame, callback: OpProgressCallback = no_op_progress_callback) -> pd.DataFrame:
# Open resource and cache it
if self.saved_model_preds_dict is None:
with open_file(self.file_path) as f:
self.model_pred_df = pd.read_parquet(f)
self.saved_model_preds_dict = dict(zip(self.model_pred_df.context_uid, self.model_pred_df.preds))

# Map the prediction based on the context_uid
df["ext_model_predictions"] = df['context_uid'].map(self.saved_model_preds_dict)
return df

Misc

Another useful SDK function to complement open_file with is snorkelflow.object_storage.download_remote_object. This function can help download objects from a remote storage and store them in your MinIO object-store.

# Downloading from s3
import snorkelflow.client as sf
ctx = sf.SnorkelFlowContext.from_kwargs(...)
minio_path_downloaded_obj = sf.object_storage.download_remote_object(
"https://s3.amazonaws.com/",
"some-s3-bucket",
"dev-split.csv",
"AWS_ACCESS_KEY",
"AWS_SECRET_KEY",
"us-west-2") # region is required for s3
from snorkelflow.utils.file import open_file
import pandas as pd
with open_file(minio_path_downloaded_obj, mode="r") as f:
df = pd.read_csv(f)