Skip to main content
Version: 25.1

snorkelflow.operators.dask_operator

class snorkelflow.operators.dask_operator(*, input_schema, name=None, resources=None, resources_fn=None, output_schema=None)

Bases: object

Decorator that wraps a function mapping a dask.dataframe.DataFrame to another dask.dataframe.DataFrame.

warning
Using @dask_operator is only recommended for advanced users, as changes to indexes/partitions in the operator will be reflected internally in the Snorkel Flow execution pipeline. For most use cases, @pandas_operator should be sufficient.

Examples

The following example is a function that adds a new column newcol.

import dask.dataframe as dd
from snorkelflow.operators import dask_operator

@dask_operator(name="set_newcol", input_schema={})
def set_newcol(ddf: dd.DataFrame) -> dd.DataFrame:
import pandas as pd
import numpy as np
from dask import dataframe as dd
import random
meta = ddf.dtypes.to_dict()
meta['newcol'] = np.dtype(float)
def _set_newcol(df: pd.DataFrame) -> pd.DataFrame:
df['newcol'] = [random.random() for _ in range(len(df))]
return df
ddf = dd.map_partitions(_set_newcol, ddf, meta=meta)
return ddf

sf.add_operator(set_newcol)
Parameters:
  • name (Optional[str], default: None) – Name of the Operator

  • resources (Optional[Dict[str, Any]], default: None) – Resources passed in to f via kwargs

  • resources_fn (Optional[Callable[[], Dict[str, Any]]], default: None) – A function for generating a dictionary of values passed to f via kwargs, that are too expensive to serialize as resources.

  • input_schema (Dict[str, Any]) – Dictionary mapping from column to dtype, used to validate the dtypes of the input dataframe.

  • output_schema (Optional[Dict[str, Any]], default: None) –

    Dictionary mapping from column to dtype, used to validate the dtypes of the output dataframe.

    If not None, then f must not delete any dataframe columns, and all new columns must be specified along with types in output_schema.

__init__(*, input_schema, name=None, resources=None, resources_fn=None, output_schema=None)

Methods

__init__(*, input_schema[, name, resources, ...])