snorkelflow.operators.span_reducer
- class snorkelflow.operators.span_reducer(name=None, datapoint_instance=name=span, columns=['context_uid', 'span_field', 'span_field_value_hash', 'char_start', 'char_end'], resources=None, input_schema=None, output_schema=None)
Bases:
object
Decorator for aggregating span-level model predictions to document-level predictions for extraction tasks.
The decorator wraps a function that takes a Pandas DataFrame as input and returns a new Pandas DataFrame. The index of the new DataFrame must be the same as the original DataFrame, unless the
datapoint_instance
is specified. The available datapoints can be found inDATAPOINT_TYPES
imported fromsnorkelflow.utils.datapoint
.warningUsing custom span reducers is only recommended for advanced users as it currently requires knowledge of internal fields and index handling.Examples
The following pattern of grouping by and reducing over the
"context_uid"
index is common.from snorkelflow.operators import span_reducer
from snorkelflow.extraction.span import SpanCols
# Get the first span in each document
@span_reducer(name="first_reducer")
def first_reducer(df: pd.DataFrame) -> pd.DataFrame:
df = df.loc[df.groupby([SpanCols.CONTEXT_UID])[SpanCols.CHAR_START].idxmin()]
return df
# Get the last span in the doc and convert to a DocDatapoint
from snorkelflow.utils.datapoint import DocDatapoint
@span_reducer(name="last_reducer", datapoint_instance=DocDatapoint())
def last_reducer(df: pd.DataFrame) -> pd.DataFrame:
df = df.loc[df.groupby([SpanCols.CONTEXT_UID])[SpanCols.CHAR_START].idxmin()]
return df
# Get multiple spans per doc expressed as a list
@span_reducer(name="multi_reducer", datapoint_instance=DocDatapoint())
def multi_reducer(df: pd.DataFrame) -> pd.DataFrame:
from snorkelflow.extraction.span import SpanCols
index_col = df.index.name
df = df.reset_index()
df = df.groupby([SpanCols.CONTEXT_UID]).agg(list)
df[index_col] = df[index_col].str[0]
df = df.reset_index().set_index(index_col)
return df
# Register any of the above reducers to make them available to add in the DAG
sf.add_operator(multi_reducer)Parameters
Parameters
Name Type Default Info name Optional[str]
None
Name of the Reducer. f Function that accepts as input from pd.DataFrame
and outputs apd.DataFrame
.datapoint_instance Optional[DatapointType]
name=span, columns=['context_uid', 'span_field', 'span_field_value_hash', 'char_start', 'char_end']
An instance of the datapoint type to reduce to. For example if you are reducing from spans to docs, this should be DocDatapoint. resources Optional[Dict[str, Any]]
None
Resources passed in to f
viakwargs
input_schema Optional[Dict[str, Any]]
None
Dictionary mapping from column to dtype, used to validate the dtypes of the input dataframe. output_schema Optional[Dict[str, Any]]
None
Dictionary mapping from column to dtype, used to validate the dtypes of the output dataframe. - __init__(name=None, datapoint_instance=name=span, columns=['context_uid', 'span_field', 'span_field_value_hash', 'char_start', 'char_end'], resources=None, input_schema=None, output_schema=None)
\_\_init\_\_
__init__
Methods
__init__
([name, datapoint_instance, ...])