snorkelflow.operators.span_reducer
- class snorkelflow.operators.span_reducer(name=None, datapoint_instance=name=span, columns=['context_uid', 'span_field', 'span_field_value_hash', 'char_start', 'char_end'], resources=None, input_schema=None, output_schema=None)
Bases:
object
Decorator for aggregating span-level model predictions to document-level predictions for extraction tasks.
The decorator wraps a function that takes a Pandas DataFrame as input and returns a new Pandas DataFrame. The index of the new DataFrame must be the same as the original DataFrame, unless the
datapoint_instance
is specified. The available datapoints can be found inDATAPOINT_TYPES
imported fromsnorkelflow.utils.datapoint
.warningUsing custom span reducers is only recommended for advanced users as it currently requires knowledge of internal fields and index handling.Examples
The following pattern of grouping by and reducing over the
"context_uid"
index is common.from snorkelflow.operators import span_reducer
from snorkelflow.extraction.span import SpanCols
# Get the first span in each document
@span_reducer(name="first_reducer")
def first_reducer(df: pd.DataFrame) -> pd.DataFrame:
df = df.loc[df.groupby([SpanCols.CONTEXT_UID])[SpanCols.CHAR_START].idxmin()]
return df
# Get the last span in the doc and convert to a DocDatapoint
from snorkelflow.utils.datapoint import DocDatapoint
@span_reducer(name="last_reducer", datapoint_instance=DocDatapoint())
def last_reducer(df: pd.DataFrame) -> pd.DataFrame:
df = df.loc[df.groupby([SpanCols.CONTEXT_UID])[SpanCols.CHAR_START].idxmin()]
return df
# Get multiple spans per doc expressed as a list
@span_reducer(name="multi_reducer", datapoint_instance=DocDatapoint())
def multi_reducer(df: pd.DataFrame) -> pd.DataFrame:
from snorkelflow.extraction.span import SpanCols
index_col = df.index.name
df = df.reset_index()
df = df.groupby([SpanCols.CONTEXT_UID]).agg(list)
df[index_col] = df[index_col].str[0]
df = df.reset_index().set_index(index_col)
return df
# Register any of the above reducers to make them available to add in the DAG
sf.add_operator(multi_reducer)- Parameters:
name (
Optional
[str
], default:None
) – Name of the Reducerf – Function that accepts as input from
pd.DataFrame
and outputs apd.DataFrame
.datapoint_instance (
Optional
[DatapointType
], default:name=span, columns=['context_uid', 'span_field', 'span_field_value_hash', 'char_start', 'char_end']
) – An instance of the datapoint type to reduce to. For example if you are reducing from spans to docs, this should be DocDatapointresources (
Optional
[Dict
[str
,Any
]], default:None
) – Resources passed in tof
viakwargs
input_schema (
Optional
[Dict
[str
,Any
]], default:None
) – Dictionary mapping from column to dtype, used to validate the dtypes of the input dataframe.output_schema (
Optional
[Dict
[str
,Any
]], default:None
) – Dictionary mapping from column to dtype, used to validate the dtypes of the output dataframe.
- __init__(name=None, datapoint_instance=name=span, columns=['context_uid', 'span_field', 'span_field_value_hash', 'char_start', 'char_end'], resources=None, input_schema=None, output_schema=None)
Methods
__init__
([name, datapoint_instance, ...])