Skip to main content
Version: 0.96

Working with MinIO

warning

Notice of deprecation

MinIO is scheduled for deprecation in future versions of Snorkel Flow through early 2025, with its functionality being replaced by Snorkel File Service (SFS).

  • We will continue to proactively communicate changes regarding deprecation directly with customers and update documentation when appropriate. If you have concerns regarding current usage of MinIO and whether it will be supported by SFS, communicate with your Snorkel Success Manager.
  • The MinIO Console will be deprecated as of Q4 2024, but will still be accessible until the end of Q1 2025 to support backwards compatibility for older workflows. After Q1 2025, access to MinIO Console will be removed.
  • Instead, use the Files feature for uploading PDFs and images. Support for arbitrary file type upload and basic file management utilities within the Files feature will be provided by end of Q1 2025 to meet MinIO Console core feature parity.
  • For files that are not PDFs and images, you can continue to use Snorkel Flow's SDK. These SDK workflows are detailed below.
  • All SDK workflows documented here will be supported by SFS.

Overview

MinIO is an object store that is compatible with S3. A MinIO API is shipped with Snorkel Flow for data management across the platform.

It allows users to upload and download files that are accessible from both Notebooks within Snorkel Flow as well as in Operators.

Connecting to MinIO

Authentication to the Snorkel Flow MinIO is handled automatically if you are working in Snorkel Flow Notebooks. If you're accessing MinIO elsewhere, you will need to set the following environment variables MINIO_URL, MINIO_ACCESS_KEY, and MINIO_SECRET_KEY.

Using MinIO with the Snorkel Flow SDK

When working with the Snorkel Flow SDK, you first need to set the appropriate context to ensure code is being applied to the right workspace and application. To use MinIO, you need to define the workspace:

import snorkelflow.client as sf
ctx = sf.SnorkelFlowContext.from_kwargs(
workspace_name="your.workspace", # change this to the name of your current workspace
)

This sf client object will be used in sections below

File upload

The SDK provides two explicit methods for uploading files (upload_file) and directories (upload_dir) using the sf client object defined at the top of this section. Both absolute and relative paths are supported. Examples are shown below:

# Upload a file from a local directory to MinIO
local_file_path = "/path/to/local/report.pdf"
remote_file_path = "minio://bucket/path/to/some/report.pdf"

uploaded_file_path = sf.upload_file(local_file_path, remote_file_path)
# Upload a directory from a local directory to MinIO
local_directory = "/path/to/local/directory"
remote_directory = "minio://bucket/upload/directory"

uploaded_dir_path = sf.upload_dir(local_directory, remote_directory)

File download

The SDK has two explicit methods for downloading files (download_file) and directories (download_dir) using the sf client object defined at the top of this section. Both absolute and relative paths are supported. Examples are shown below:

# Download a file from MinIO to a local directory
remote_file_path = "minio://bucket/path/to/some/report.pdf"
local_file_path = "/path/to/local/report.pdf"

sf.download_file(remote_file_path, local_file_path)
# Download a directory from MinIO to a local directory
remote_directory = "minio://bucket/upload/directory"
local_directory = "/path/to/local/directory"

sf.download_dir(remote_directory, local_directory)

List directory

To list files in a remote directory, use the list_dir method with the sf client object defined at the top of this section. Example is shown below:

remote_directory = "minio://bucket/upload/directory"

sf.list_dir(remote_directory)

Generic file operations

For file operations, use the SDK function snorkelflow.utils.file.open_file, which will return a file-like object. open_file works with both MinIO and local paths.

Reading a file from MinIO:

from snorkelflow.utils.file import open_file
with open_file("minio://bucket/path/to/some/file", mode="r") as f:
data = f.read()

Writing a file to MinIO:

from snorkelflow.utils.file import open_file
with open_file("minio://bucket/path/to/some/file", mode="w") as f:
data = f.write("Hello, World!")

Example use cases

Files in MinIO can be used as resources in custom Operators and labeling functions. This can be useful if you want to save computationally expensive outputs to a cached file such as Hugging Face model outputs.

Labeling function example

import snorkelflow.client as sf
from snorkelflow.studio import resources_fn_labeling_function

ctx = sf.SnorkelFlowContext.from_kwargs(
workspace_name="your.workspace", # change this to the name of your current workspace
)

# Replace "my_application" with the actual application name
application_name = "my_application"

# Fetch the model node for the given application
node = sf.get_model_node(application_name)

def get_minio_file():
from snorkelflow.utils.file import open_file
import json
ext_file = "minio://bucket/path/to/some/resource.json"
with open_file(ext_file, mode='r') as f:
ext_resource = json.load(f)
return {"ext_resource": ext_resource}

@resources_fn_labeling_function(name="sample_code_lf", resources_fn=get_minio_file)
def lf(x, ext_resource):
if x in ext_resource:
return "LABEL"
return "UNKNOWN"
sf.add_code_lf(node, lf, label="LABEL")

Custom operator class example

The file at the MinIO path should be opened within the _compute_features method.

from typing import Any, Dict, Optional
from snorkelflow.operators.featurizer import Featurizer, OpProgressCallback

class CustomFeaturizer(Featurizer):
"""Preprocessor that retrieves model predictions from a file"""

def __init__(self, file_path: str):
self.file_path = file_path

import pandas as pd
from snorkelflow.utils.file import open_file
self.saved_model_preds_dict: Optional[Dict] = None
self.model_pred_df: Optional[pd.DataFrame] = None

@property
def input_schema(self):
return {}

@property
def output_schema(self):
return {"ext_model_predictions": str}
def no_op_progress_callback(*args: Any, **kwargs: Any) -> None:
pass
def _compute_features(self, df: pd.DataFrame, callback: OpProgressCallback = no_op_progress_callback) -> pd.DataFrame:
# Open resource and cache it
if self.saved_model_preds_dict is None:
with open_file(self.file_path) as f:
self.model_pred_df = pd.read_parquet(f)
self.saved_model_preds_dict = dict(zip(self.model_pred_df.context_uid, self.model_pred_df.preds))

# Map predictions based on the context_uid
df["ext_model_predictions"] = df['context_uid'].map(self.saved_model_preds_dict)
return df