Skip to main content
Version: 0.95

snorkelflow.operators.span_reducer

class snorkelflow.operators.span_reducer(name=None, datapoint_instance=name=span, columns=['context_uid', 'span_field', 'span_field_value_hash', 'char_start', 'char_end'], resources=None, input_schema=None, output_schema=None)

Bases: object

Decorator for aggregating span-level model predictions to document-level predictions for extraction tasks.

The decorator wraps a function that takes a Pandas DataFrame as input and returns a new Pandas DataFrame. The index of the new DataFrame must be the same as the original DataFrame, unless the datapoint_instance is specified. The available datapoints can be found in DATAPOINT_TYPES imported from snorkelflow.utils.datapoint.

warning
Using custom span reducers is only recommended for advanced users as it currently requires knowledge of internal fields and index handling.

Examples

The following pattern of grouping by and reducing over the "context_uid" index is common.

from snorkelflow.operators import span_reducer
from snorkelflow.extraction.span import SpanCols

# Get the first span in each document
@span_reducer(name="first_reducer")
def first_reducer(df: pd.DataFrame) -> pd.DataFrame:
df = df.loc[df.groupby([SpanCols.CONTEXT_UID])[SpanCols.CHAR_START].idxmin()]
return df

# Get the last span in the doc and convert to a DocDatapoint
from snorkelflow.utils.datapoint import DocDatapoint
@span_reducer(name="last_reducer", datapoint_instance=DocDatapoint())
def last_reducer(df: pd.DataFrame) -> pd.DataFrame:
df = df.loc[df.groupby([SpanCols.CONTEXT_UID])[SpanCols.CHAR_START].idxmin()]
return df

# Get multiple spans per doc expressed as a list
@span_reducer(name="multi_reducer", datapoint_instance=DocDatapoint())
def multi_reducer(df: pd.DataFrame) -> pd.DataFrame:
from snorkelflow.extraction.span import SpanCols
index_col = df.index.name
df = df.reset_index()
df = df.groupby([SpanCols.CONTEXT_UID]).agg(list)
df[index_col] = df[index_col].str[0]
df = df.reset_index().set_index(index_col)
return df

# Register any of the above reducers to make them available to add in the DAG
sf.add_operator(multi_reducer)
Parameters:
  • name (Optional[str], default: None) – Name of the Reducer

  • f – Function that accepts as input from pd.DataFrame and outputs a pd.DataFrame.

  • datapoint_instance (Optional[DatapointType], default: name=span, columns=['context_uid', 'span_field', 'span_field_value_hash', 'char_start', 'char_end']) – An instance of the datapoint type to reduce to. For example if you are reducing from spans to docs, this should be DocDatapoint

  • resources (Optional[Dict[str, Any]], default: None) – Resources passed in to f via kwargs

  • input_schema (Optional[Dict[str, Any]], default: None) – Dictionary mapping from column to dtype, used to validate the dtypes of the input dataframe.

  • output_schema (Optional[Dict[str, Any]], default: None) – Dictionary mapping from column to dtype, used to validate the dtypes of the output dataframe.

__init__(name=None, datapoint_instance=name=span, columns=['context_uid', 'span_field', 'span_field_value_hash', 'char_start', 'char_end'], resources=None, input_schema=None, output_schema=None)

Methods

__init__([name, datapoint_instance, ...])