Skip to content

TensorFlow Data Validation API Documentation

tensorflow_data_validation

Init module for TensorFlow Data Validation.

Attributes

FeaturePath module-attribute

FeaturePath = FeaturePath

__version__ module-attribute

__version__ = '1.17.0.dev'

Classes

CombinerStatsGenerator

CombinerStatsGenerator(
    name: Text, schema: Optional[Schema] = None
)

Bases: Generic[ACCTYPE], StatsGenerator

A StatsGenerator which computes statistics using a combiner function.

This class computes statistics using a combiner function. It emits partial states processing a batch of examples at a time, merges the partial states, and finally computes the statistics from the merged partial state at the end.

This object mirrors a beam.CombineFn except for the add_input interface, which is expected to be defined by its sub-classes. Specifically, the generator must implement the following four methods:

Initializes an accumulator to store the partial state and returns it. create_accumulator()

Incorporates a batch of input examples (represented as an arrow RecordBatch) into the current accumulator and returns the updated accumulator. add_input(accumulator, input_record_batch)

Merge the partial states in the accumulators and returns the accumulator containing the merged state. merge_accumulators(accumulators)

Compute statistics from the partial state in the accumulator and return the result as a DatasetFeatureStatistics proto. extract_output(accumulator)

Initializes a statistics generator.

PARAMETER DESCRIPTION
name

A unique name associated with the statistics generator.

TYPE: Text

schema

An optional schema for the dataset.

TYPE: Optional[Schema] DEFAULT: None

Source code in tensorflow_data_validation/statistics/generators/stats_generator.py
def __init__(self, name: Text,
             schema: Optional[schema_pb2.Schema] = None) -> None:
  """Initializes a statistics generator.

  Args:
    name: A unique name associated with the statistics generator.
    schema: An optional schema for the dataset.
  """
  self._name = name
  self._schema = schema
Attributes
name property
name
schema property
schema
Functions
add_input
add_input(
    accumulator: ACCTYPE, input_record_batch: RecordBatch
) -> ACCTYPE

Returns result of folding a batch of inputs into accumulator.

PARAMETER DESCRIPTION
accumulator

The current accumulator, which may be modified and returned for efficiency.

TYPE: ACCTYPE

input_record_batch

An Arrow RecordBatch whose columns are features and rows are examples. The columns are of type List or Null (If a feature's value is None across all the examples in the batch, its corresponding column is of Null type).

TYPE: RecordBatch

RETURNS DESCRIPTION
ACCTYPE

The accumulator after updating the statistics for the batch of inputs.

Source code in tensorflow_data_validation/statistics/generators/stats_generator.py
def add_input(self, accumulator: ACCTYPE,
              input_record_batch: pa.RecordBatch) -> ACCTYPE:
  """Returns result of folding a batch of inputs into accumulator.

  Args:
    accumulator: The current accumulator, which may be modified and returned
      for efficiency.
    input_record_batch: An Arrow RecordBatch whose columns are features and
      rows are examples. The columns are of type List<primitive> or Null (If a
      feature's value is None across all the examples in the batch, its
      corresponding column is of Null type).

  Returns:
    The accumulator after updating the statistics for the batch of inputs.
  """
  raise NotImplementedError
compact
compact(accumulator: ACCTYPE) -> ACCTYPE

Returns a compact representation of the accumulator.

This is optionally called before an accumulator is sent across the wire. The base class is a no-op. This may be overwritten by the derived class.

PARAMETER DESCRIPTION
accumulator

The accumulator to compact.

TYPE: ACCTYPE

RETURNS DESCRIPTION
ACCTYPE

The compacted accumulator. By default is an identity.

Source code in tensorflow_data_validation/statistics/generators/stats_generator.py
def compact(self, accumulator: ACCTYPE) -> ACCTYPE:
  """Returns a compact representation of the accumulator.

  This is optionally called before an accumulator is sent across the wire. The
  base class is a no-op. This may be overwritten by the derived class.

  Args:
    accumulator: The accumulator to compact.

  Returns:
    The compacted accumulator. By default is an identity.
  """
  return accumulator
create_accumulator
create_accumulator() -> ACCTYPE

Returns a fresh, empty accumulator.

RETURNS DESCRIPTION
ACCTYPE

An empty accumulator.

Source code in tensorflow_data_validation/statistics/generators/stats_generator.py
def create_accumulator(self) -> ACCTYPE:
  """Returns a fresh, empty accumulator.

  Returns:
    An empty accumulator.
  """
  raise NotImplementedError
extract_output
extract_output(
    accumulator: ACCTYPE,
) -> DatasetFeatureStatistics

Returns result of converting accumulator into the output value.

PARAMETER DESCRIPTION
accumulator

The final accumulator value.

TYPE: ACCTYPE

RETURNS DESCRIPTION
DatasetFeatureStatistics

A proto representing the result of this stats generator.

Source code in tensorflow_data_validation/statistics/generators/stats_generator.py
def extract_output(
    self, accumulator: ACCTYPE) -> statistics_pb2.DatasetFeatureStatistics:
  """Returns result of converting accumulator into the output value.

  Args:
    accumulator: The final accumulator value.

  Returns:
    A proto representing the result of this stats generator.
  """
  raise NotImplementedError
merge_accumulators
merge_accumulators(
    accumulators: Iterable[ACCTYPE],
) -> ACCTYPE

Merges several accumulators to a single accumulator value.

Note: mutating any element in accumulators except for the first is not allowed. The first element may be modified and returned for efficiency.

PARAMETER DESCRIPTION
accumulators

The accumulators to merge.

TYPE: Iterable[ACCTYPE]

RETURNS DESCRIPTION
ACCTYPE

The merged accumulator.

Source code in tensorflow_data_validation/statistics/generators/stats_generator.py
def merge_accumulators(self, accumulators: Iterable[ACCTYPE]) -> ACCTYPE:
  """Merges several accumulators to a single accumulator value.

  Note: mutating any element in `accumulators` except for the first is not
  allowed. The first element may be modified and returned for efficiency.

  Args:
    accumulators: The accumulators to merge.

  Returns:
    The merged accumulator.
  """
  raise NotImplementedError
setup
setup() -> None

Prepares an instance for combining.

Subclasses should put costly initializations here instead of in init(), so that 1) the cost is properly recognized by Beam as setup cost (per worker) and 2) the cost is not paid at the pipeline construction time.

Source code in tensorflow_data_validation/statistics/generators/stats_generator.py
def setup(self) -> None:
  """Prepares an instance for combining.

     Subclasses should put costly initializations here instead of in
     __init__(), so that 1) the cost is properly recognized by Beam as
     setup cost (per worker) and 2) the cost is not paid at the pipeline
     construction time.
  """
  pass

CrossFeatureView

CrossFeatureView(stats_proto: CrossFeatureStatistics)

Bases: object

View of a single cross feature.

Source code in tensorflow_data_validation/utils/stats_util.py
def __init__(self, stats_proto: statistics_pb2.CrossFeatureStatistics):
  self._statistics = stats_proto
Functions
proto
proto() -> CrossFeatureStatistics

Retrieve the underlying proto.

Source code in tensorflow_data_validation/utils/stats_util.py
def proto(self) -> statistics_pb2.CrossFeatureStatistics:
  """Retrieve the underlying proto."""
  return self._statistics

DatasetListView

DatasetListView(stats_proto: DatasetFeatureStatisticsList)

Bases: object

View of statistics for multiple datasets (slices).

Source code in tensorflow_data_validation/utils/stats_util.py
def __init__(self, stats_proto: statistics_pb2.DatasetFeatureStatisticsList):
  self._statistics = stats_proto
  self._slice_map = {}  # type: Dict[str, DatasetView]
  self._initialized = False
Functions
get_default_slice
get_default_slice() -> Optional[DatasetView]
Source code in tensorflow_data_validation/utils/stats_util.py
def get_default_slice(self) -> Optional['DatasetView']:
  self._init_index()
  if len(self._slice_map) == 1:
    for _, v in self._slice_map.items():
      return v
  return self._slice_map.get(constants.DEFAULT_SLICE_KEY, None)
get_default_slice_or_die
get_default_slice_or_die() -> DatasetView
Source code in tensorflow_data_validation/utils/stats_util.py
def get_default_slice_or_die(self) -> 'DatasetView':
  # TODO(b/221453427): Update uses, or consider changing get_default_slice.
  default_slice = self.get_default_slice()
  if default_slice is None:
    raise ValueError('Missing default slice')
  return default_slice
get_slice
get_slice(slice_key: str) -> Optional[DatasetView]
Source code in tensorflow_data_validation/utils/stats_util.py
def get_slice(self, slice_key: str) -> Optional['DatasetView']:
  self._init_index()
  return self._slice_map.get(slice_key, None)
list_slices
list_slices() -> Iterable[str]
Source code in tensorflow_data_validation/utils/stats_util.py
def list_slices(self) -> Iterable[str]:
  self._init_index()
  return self._slice_map.keys()
proto
proto() -> DatasetFeatureStatisticsList

Retrieve the underlying proto.

Source code in tensorflow_data_validation/utils/stats_util.py
def proto(self) -> statistics_pb2.DatasetFeatureStatisticsList:
  """Retrieve the underlying proto."""
  return self._statistics

DatasetView

DatasetView(stats_proto: DatasetFeatureStatistics)

Bases: object

View of statistics for a dataset (slice).

Source code in tensorflow_data_validation/utils/stats_util.py
def __init__(self, stats_proto: statistics_pb2.DatasetFeatureStatistics):
  self._feature_map = {}  # type: Dict[types.FeaturePath, int]
  self._cross_feature_map = {
  }  # type: Dict[Tuple[types.FeaturePath, types.FeaturePath], int]
  self._statistics = stats_proto
  self._initialized = False
Functions
get_cross_feature
get_cross_feature(
    x_path: Union[str, FeaturePath, Iterable[str]],
    y_path: Union[str, FeaturePath, Iterable[str]],
) -> Optional[CrossFeatureView]

Retrieve a cross-feature if it exists, or None.

Source code in tensorflow_data_validation/utils/stats_util.py
def get_cross_feature(
    self, x_path: Union[str, types.FeaturePath,
                        Iterable[str]], y_path: Union[str, types.FeaturePath,
                                                      Iterable[str]]
) -> Optional['CrossFeatureView']:
  """Retrieve a cross-feature if it exists, or None."""

  x_path = _normalize_feature_id(x_path)
  y_path = _normalize_feature_id(y_path)
  self._init_index()
  feature_id = (x_path, y_path)
  index = self._cross_feature_map.get(feature_id, None)
  if index is None:
    return None
  return CrossFeatureView(self._statistics.cross_features[index])
get_derived_feature
get_derived_feature(
    deriver_name: str, source_paths: Sequence[FeaturePath]
) -> Optional[FeatureView]

Retrieve a derived feature based on a deriver name and its inputs.

PARAMETER DESCRIPTION
deriver_name

The name of a deriver. Matches validation_derived_source deriver_name.

TYPE: str

source_paths

Source paths for derived features. Matches validation_derived_source.source_path.

TYPE: Sequence[FeaturePath]

RETURNS DESCRIPTION
Optional[FeatureView]

FeatureView of derived feature.

Source code in tensorflow_data_validation/utils/stats_util.py
def get_derived_feature(
    self, deriver_name: str,
    source_paths: Sequence[types.FeaturePath]) -> Optional['FeatureView']:
  """Retrieve a derived feature based on a deriver name and its inputs.

  Args:
    deriver_name: The name of a deriver. Matches validation_derived_source
      deriver_name.
    source_paths: Source paths for derived features. Matches
      validation_derived_source.source_path.

  Returns:
    FeatureView of derived feature.

  Raises:
    ValueError if multiple derived features match.
  """
  # TODO(b/221453427): Consider indexing if performance becomes an issue.
  results = []
  for feature in self.proto().features:
    if feature.validation_derived_source is None:
      continue
    if feature.validation_derived_source.deriver_name != deriver_name:
      continue
    if (len(source_paths) != len(
        feature.validation_derived_source.source_path)):
      continue
    all_match = True
    for i in range(len(source_paths)):
      if (source_paths[i] != types.FeaturePath.from_proto(
          feature.validation_derived_source.source_path[i])):
        all_match = False
        break
    if all_match:
      results.append(FeatureView(feature))
    if len(results) > 1:
      raise ValueError('Ambiguous result, %d features matched' % len(results))
  if len(results) == 1:
    return results.pop()
  return None
get_feature
get_feature(
    feature_id: Union[str, FeaturePath, Iterable[str]],
) -> Optional[FeatureView]

Retrieve a feature if it exists.

Features specified within the underlying proto by name (instead of path) are normalized to a length 1 path, and can be referred to as such.

PARAMETER DESCRIPTION
feature_id

A types.FeaturePath, Iterable[str] consisting of path steps, or a str, which is converted to a length one path.

TYPE: Union[str, FeaturePath, Iterable[str]]

RETURNS DESCRIPTION
Optional[FeatureView]

A FeatureView, or None if feature_id is not present.

Source code in tensorflow_data_validation/utils/stats_util.py
def get_feature(
    self, feature_id: Union[str, types.FeaturePath, Iterable[str]]
) -> Optional['FeatureView']:
  """Retrieve a feature if it exists.

  Features specified within the underlying proto by name (instead of path) are
  normalized to a length 1 path, and can be referred to as such.

  Args:
    feature_id: A types.FeaturePath, Iterable[str] consisting of path steps,
      or a str, which is converted to a length one path.

  Returns:
    A FeatureView, or None if feature_id is not present.
  """
  feature_id = _normalize_feature_id(feature_id)
  self._init_index()
  index = self._feature_map.get(feature_id, None)
  if index is None:
    return None
  return FeatureView(self._statistics.features[index])
list_cross_features
list_cross_features() -> Iterable[
    Tuple[FeaturePath, FeaturePath]
]

Lists cross-feature identifiers.

Source code in tensorflow_data_validation/utils/stats_util.py
def list_cross_features(
    self) -> Iterable[Tuple[types.FeaturePath, types.FeaturePath]]:
  """Lists cross-feature identifiers."""
  self._init_index()
  return self._cross_feature_map.keys()
list_features
list_features() -> Iterable[FeaturePath]

Lists feature identifiers.

Source code in tensorflow_data_validation/utils/stats_util.py
def list_features(self) -> Iterable[types.FeaturePath]:
  """Lists feature identifiers."""
  self._init_index()
  return self._feature_map.keys()
proto
proto() -> DatasetFeatureStatistics

Retrieve the underlying proto.

Source code in tensorflow_data_validation/utils/stats_util.py
def proto(self) -> statistics_pb2.DatasetFeatureStatistics:
  """Retrieve the underlying proto."""
  return self._statistics

DetectFeatureSkew

DetectFeatureSkew(
    identifier_features: List[FeatureName],
    features_to_ignore: Optional[List[FeatureName]] = None,
    sample_size: int = 0,
    float_round_ndigits: Optional[int] = None,
    allow_duplicate_identifiers: bool = False,
)

Bases: PTransform

API for detecting feature skew between training and serving examples.

Example:

  with beam.Pipeline(runner=...) as p:
     training_examples = p | 'ReadTrainingData' >>
       beam.io.ReadFromTFRecord(
          training_filepaths, coder=beam.coders.ProtoCoder(tf.train.Example))
     serving_examples = p | 'ReadServingData' >>
       beam.io.ReadFromTFRecord(
          serving_filepaths, coder=beam.coders.ProtoCoder(tf.train.Example))
     _ = ((training_examples, serving_examples) | 'DetectFeatureSkew' >>
       DetectFeatureSkew(identifier_features=['id1'], sample_size=5)
     | 'WriteFeatureSkewResultsOutput' >>
       tfdv.WriteFeatureSkewResultsToTFRecord(output_path)
     | 'WriteFeatureSkwePairsOutput' >>
     tfdv.WriteFeatureSkewPairsToTFRecord(output_path))

See the documentation for DetectFeatureSkewImpl for more detail about feature skew detection.

Initializes the feature skew detection PTransform.

PARAMETER DESCRIPTION
identifier_features

Names of features to use as identifiers.

TYPE: List[FeatureName]

features_to_ignore

Names of features for which no feature skew detection is done.

TYPE: Optional[List[FeatureName]] DEFAULT: None

sample_size

Size of the sample of training-serving example pairs that exhibit skew to include in the skew results.

TYPE: int DEFAULT: 0

float_round_ndigits

Number of digits precision after the decimal point to which to round float values before comparing them.

TYPE: Optional[int] DEFAULT: None

allow_duplicate_identifiers

If set, skew detection will be done on examples for which there are duplicate identifier feature values. In this case, the counts in the FeatureSkew result are based on each training-serving example pair analyzed. Examples with given identifier feature values must all fit in memory.

TYPE: bool DEFAULT: False

Source code in tensorflow_data_validation/api/validation_api.py
def __init__(
    self,
    identifier_features: List[types.FeatureName],
    features_to_ignore: Optional[List[types.FeatureName]] = None,
    sample_size: int = 0,
    float_round_ndigits: Optional[int] = None,
    allow_duplicate_identifiers: bool = False) -> None:
  """Initializes the feature skew detection PTransform.

  Args:
    identifier_features: Names of features to use as identifiers.
    features_to_ignore: Names of features for which no feature skew detection
      is done.
    sample_size: Size of the sample of training-serving example pairs that
      exhibit skew to include in the skew results.
    float_round_ndigits: Number of digits precision after the decimal point to
      which to round float values before comparing them.
    allow_duplicate_identifiers: If set, skew detection will be done on
      examples for which there are duplicate identifier feature values. In
      this case, the counts in the FeatureSkew result are based on each
      training-serving example pair analyzed. Examples with given identifier
      feature values must all fit in memory.
  """
  self._identifier_features = identifier_features
  self._features_to_ignore = features_to_ignore
  self._sample_size = sample_size
  self._float_round_ndigits = float_round_ndigits
  self._allow_duplicate_identifiers = allow_duplicate_identifiers
Functions
expand
expand(
    datasets: Tuple[PCollection, PCollection],
) -> Tuple[PCollection, PCollection]
Source code in tensorflow_data_validation/api/validation_api.py
def expand(
    self, datasets: Tuple[beam.pvalue.PCollection, beam.pvalue.PCollection]
) -> Tuple[beam.pvalue.PCollection, beam.pvalue.PCollection]:

  result = (
      datasets
      | 'DetectFeatureSkew' >> feature_skew_detector.DetectFeatureSkewImpl(
          self._identifier_features, self._features_to_ignore,
          self._sample_size, self._float_round_ndigits,
          self._allow_duplicate_identifiers))
  return result[feature_skew_detector.SKEW_RESULTS_KEY], result[
      feature_skew_detector.SKEW_PAIRS_KEY]

FeatureView

FeatureView(stats_proto: FeatureNameStatistics)

Bases: object

View of a single feature.

This class provides accessor methods, as well as access to the underlying proto. Where possible, accessors should be used in place of proto access (for example, x.numeric_statistics() instead of x.proto().num_stats) in order to support future extension of the proto.

Source code in tensorflow_data_validation/utils/stats_util.py
def __init__(self, stats_proto: statistics_pb2.FeatureNameStatistics):
  self._statistics = stats_proto
Functions
bytes_statistics
bytes_statistics() -> Optional[BytesStatistics]

Retrieve byte statistics if available.

Source code in tensorflow_data_validation/utils/stats_util.py
def bytes_statistics(self) -> Optional[statistics_pb2.BytesStatistics]:
  """Retrieve byte statistics if available."""
  if self._statistics.WhichOneof('stats') == 'bytes_stats':
    return self._statistics.bytes_stats
  return None
common_statistics
common_statistics() -> Optional[CommonStatistics]

Retrieve common statistics if available.

Source code in tensorflow_data_validation/utils/stats_util.py
def common_statistics(self) -> Optional[statistics_pb2.CommonStatistics]:
  """Retrieve common statistics if available."""
  which = self._statistics.WhichOneof('stats')
  if which == 'num_stats':
    return self._statistics.num_stats.common_stats
  if which == 'string_stats':
    return self._statistics.string_stats.common_stats
  if which == 'bytes_stats':
    return self._statistics.bytes_stats.common_stats
  if which == 'struct_stats':
    return self._statistics.struct_stats.common_stats
  return None
custom_statistic
custom_statistic(name: str) -> Optional[CustomStatistic]

Retrieve a custom_statistic by name.

Source code in tensorflow_data_validation/utils/stats_util.py
def custom_statistic(self,
                     name: str) -> Optional[statistics_pb2.CustomStatistic]:
  """Retrieve a custom_statistic by name."""
  result = None
  for stat in self._statistics.custom_stats:
    if stat.name == name:
      if result is None:
        result = stat
      else:
        raise ValueError('Duplicate custom_stats for name %s' % name)
  return result
numeric_statistics
numeric_statistics() -> Optional[NumericStatistics]

Retrieve numeric statistics if available.

Source code in tensorflow_data_validation/utils/stats_util.py
def numeric_statistics(self) -> Optional[statistics_pb2.NumericStatistics]:
  """Retrieve numeric statistics if available."""
  if self._statistics.WhichOneof('stats') == 'num_stats':
    return self._statistics.num_stats
  return None
proto
proto() -> FeatureNameStatistics

Retrieve the underlying proto.

Source code in tensorflow_data_validation/utils/stats_util.py
def proto(self) -> statistics_pb2.FeatureNameStatistics:
  """Retrieve the underlying proto."""
  return self._statistics
string_statistics
string_statistics() -> Optional[StringStatistics]

Retrieve string statistics if available.

Source code in tensorflow_data_validation/utils/stats_util.py
def string_statistics(self) -> Optional[statistics_pb2.StringStatistics]:
  """Retrieve string statistics if available."""
  if self._statistics.WhichOneof('stats') == 'string_stats':
    return self._statistics.string_stats
  return None
struct_statistics
struct_statistics() -> Optional[StructStatistics]

Retrieve struct statistics if available.

Source code in tensorflow_data_validation/utils/stats_util.py
def struct_statistics(self) -> Optional[statistics_pb2.StructStatistics]:
  """Retrieve struct statistics if available."""
  if self._statistics.WhichOneof('stats') == 'struct_stats':
    return self._statistics.struct_stats
  return None

GenerateStatistics

GenerateStatistics(options: StatsOptions = StatsOptions())

Bases: PTransform

API for generating data statistics.

Example:

  with beam.Pipeline(runner=...) as p:
    _ = (p
         | 'ReadData' >> tfx_bsl.public.tfxio.TFExampleRecord(data_location)
             .BeamSource()
         | 'GenerateStatistics' >> GenerateStatistics()
         | 'WriteStatsOutput' >> tfdv.WriteStatisticsToTFRecord(output_path))

Initializes the transform.

PARAMETER DESCRIPTION
options

tfdv.StatsOptions for generating data statistics.

TYPE: StatsOptions DEFAULT: StatsOptions()

RAISES DESCRIPTION
TypeError

If options is not of the expected type.

Source code in tensorflow_data_validation/api/stats_api.py
def __init__(
    self,
    options: stats_options.StatsOptions = stats_options.StatsOptions()
) -> None:
  """Initializes the transform.

  Args:
    options: `tfdv.StatsOptions` for generating data statistics.

  Raises:
    TypeError: If options is not of the expected type.
  """
  if not isinstance(options, stats_options.StatsOptions):
    raise TypeError('options is of type %s, should be a StatsOptions.' %
                    type(options).__name__)
  self._options = options
Functions
expand
expand(
    dataset: PCollection[RecordBatch],
) -> PCollection[DatasetFeatureStatisticsList]
Source code in tensorflow_data_validation/api/stats_api.py
def expand(
    self, dataset: beam.PCollection[pa.RecordBatch]
) -> beam.PCollection[statistics_pb2.DatasetFeatureStatisticsList]:
  if self._options.sample_rate is not None:
    dataset |= ('SampleExamplesAtRate(%s)' % self._options.sample_rate >>
                beam.FlatMap(_sample_at_rate,
                             sample_rate=self._options.sample_rate))

  return (dataset | 'RunStatsGenerators' >>
          stats_impl.GenerateStatisticsImpl(self._options))

MergeDatasetFeatureStatisticsList

Bases: PTransform

API for merging sharded DatasetFeatureStatisticsList.

Functions
expand
expand(stats: PCollection)
Source code in tensorflow_data_validation/api/stats_api.py
def expand(self, stats: beam.PCollection):
  return stats | 'MergeDatasetFeatureStatisticsProtos' >> beam.CombineGlobally(
              merge_util.merge_dataset_feature_statistics_list)

StatsOptions

StatsOptions(
    generators: Optional[List[StatsGenerator]] = None,
    schema: Optional[Schema] = None,
    label_feature: Optional[FeatureName] = None,
    weight_feature: Optional[FeatureName] = None,
    slice_functions: Optional[List[SliceFunction]] = None,
    sample_rate: Optional[float] = None,
    num_top_values: int = 20,
    frequency_threshold: int = 1,
    weighted_frequency_threshold: float = 1.0,
    num_rank_histogram_buckets: int = 1000,
    num_values_histogram_buckets: int = 10,
    num_histogram_buckets: int = 10,
    num_quantiles_histogram_buckets: int = 10,
    epsilon: float = 0.01,
    infer_type_from_schema: bool = False,
    desired_batch_size: Optional[int] = None,
    enable_semantic_domain_stats: bool = False,
    semantic_domain_stats_sample_rate: Optional[
        float
    ] = None,
    per_feature_weight_override: Optional[
        Dict[FeaturePath, FeatureName]
    ] = None,
    vocab_paths: Optional[
        Dict[VocabName, VocabPath]
    ] = None,
    add_default_generators: bool = True,
    feature_allowlist: Optional[
        Union[List[FeatureName], List[FeaturePath]]
    ] = None,
    experimental_use_sketch_based_topk_uniques: Optional[
        bool
    ] = None,
    use_sketch_based_topk_uniques: Optional[bool] = None,
    experimental_slice_functions: Optional[
        List[SliceFunction]
    ] = None,
    experimental_slice_sqls: Optional[List[Text]] = None,
    experimental_result_partitions: int = 1,
    experimental_num_feature_partitions: int = 1,
    slicing_config: Optional[SlicingConfig] = None,
    experimental_filter_read_paths: bool = False,
    per_feature_stats_config: Optional[
        PerFeatureStatsConfig
    ] = None,
)

Bases: object

Options for generating statistics.

Initializes statistics options.

PARAMETER DESCRIPTION
generators

An optional list of statistics generators. A statistics generator must extend either CombinerStatsGenerator or TransformStatsGenerator.

TYPE: Optional[List[StatsGenerator]] DEFAULT: None

schema

An optional tensorflow_metadata Schema proto. Currently we use the schema to infer categorical and bytes features.

TYPE: Optional[Schema] DEFAULT: None

label_feature

An optional feature name which represents the label.

TYPE: Optional[FeatureName] DEFAULT: None

weight_feature

An optional feature name whose numeric value represents the weight of an example.

TYPE: Optional[FeatureName] DEFAULT: None

slice_functions

DEPRECATED. Use experimental_slice_functions.

TYPE: Optional[List[SliceFunction]] DEFAULT: None

sample_rate

An optional sampling rate. If specified, statistics is computed over the sample.

TYPE: Optional[float] DEFAULT: None

num_top_values

An optional number of most frequent feature values to keep for string features.

TYPE: int DEFAULT: 20

frequency_threshold

An optional minimum number of examples the most frequent values must be present in.

TYPE: int DEFAULT: 1

weighted_frequency_threshold

An optional minimum weighted number of examples the most frequent weighted values must be present in. This option is only relevant when a weight_feature is specified.

TYPE: float DEFAULT: 1.0

num_rank_histogram_buckets

An optional number of buckets in the rank histogram for string features.

TYPE: int DEFAULT: 1000

num_values_histogram_buckets

An optional number of buckets in a quantiles histogram for the number of values per Feature, which is stored in CommonStatistics.num_values_histogram.

TYPE: int DEFAULT: 10

num_histogram_buckets

An optional number of buckets in a standard NumericStatistics.histogram with equal-width buckets.

TYPE: int DEFAULT: 10

num_quantiles_histogram_buckets

An optional number of buckets in a quantiles NumericStatistics.histogram.

TYPE: int DEFAULT: 10

epsilon

An optional error tolerance for the computation of quantiles, typically a small fraction close to zero (e.g. 0.01). Higher values of epsilon increase the quantile approximation, and hence result in more unequal buckets, but could improve performance, and resource consumption.

TYPE: float DEFAULT: 0.01

infer_type_from_schema

A boolean to indicate whether the feature types should be inferred from the schema. If set to True, an input schema must be provided. This flag is used only when invoking TFDV through tfdv.generate_statistics_from_csv.

TYPE: bool DEFAULT: False

desired_batch_size

An optional maximum number of examples to include in each batch that is passed to the statistics generators. When invoking TFDV using its end-to-end APIs (e.g. generate_statistics_from_tfrecord), this option also controls the decoder batch size -- if provided, the decoded RecordBatches that are to be fed to TFDV will have the fixed batch size. When invoking TFDV using tfdv.GenerateStatistics, this option only controls the maximum size of RecordBatches constructed within StatsGenerators (a generator may combine RecordBatches).

TYPE: Optional[int] DEFAULT: None

enable_semantic_domain_stats

If True statistics for semantic domains are generated (e.g: image, text domains).

TYPE: bool DEFAULT: False

semantic_domain_stats_sample_rate

An optional sampling rate for semantic domain statistics. If specified, semantic domain statistics is computed over a sample.

TYPE: Optional[float] DEFAULT: None

per_feature_weight_override

If specified, the "example weight" paired with a feature will be first looked up in this map and if not found, fall back to weight_feature.

TYPE: Optional[Dict[FeaturePath, FeatureName]] DEFAULT: None

vocab_paths

An optional dictionary mapping vocab names to paths. Used in the schema when specifying a NaturalLanguageDomain. The paths can either be to GZIP-compressed TF record files that have a tfrecord.gz suffix or to text files.

TYPE: Optional[Dict[VocabName, VocabPath]] DEFAULT: None

add_default_generators

Whether to invoke the default set of stats generators in the run. Generators invoked consists of 1) the default generators (controlled by this option); 2) user-provided generators ( controlled by the generators option); 3) semantic generators (controlled by enable_semantic_domain_stats) and 4) schema-based generators that are enabled based on information provided in the schema.

TYPE: bool DEFAULT: True

feature_allowlist

An optional list of names of the features to calculate statistics for, or a list of paths.

TYPE: Optional[Union[List[FeatureName], List[FeaturePath]]] DEFAULT: None

experimental_use_sketch_based_topk_uniques

Deprecated, prefer use_sketch_based_topk_uniques.

TYPE: Optional[bool] DEFAULT: None

use_sketch_based_topk_uniques

if True, use the sketch based top-k and uniques stats generator.

TYPE: Optional[bool] DEFAULT: None

experimental_slice_functions

An optional list of functions that generate slice keys for each example. Each slice function should take pyarrow.RecordBatch as input and return an Iterable[Tuple[Text, pyarrow.RecordBatch]]. Each tuple contains the slice key and the corresponding sliced RecordBatch. Only one of experimental_slice_functions or experimental_slice_sqls must be specified.

TYPE: Optional[List[SliceFunction]] DEFAULT: None

experimental_slice_sqls

List of slicing SQL queries. The query must have the following pattern: "SELECT STRUCT({feature_name} [AS {slice_key}]) [FROM example.feature_name [, example.feature_name, ... ][WHERE ... ]]" The “example.feature_name” inside the FROM statement is used to flatten the repeated fields. For non-repeated fields, you can directly write the query as follows: “SELECT STRUCT(non_repeated_feature_a, non_repeated_feature_b)” In the query, the “example” is a key word that binds to each input "row". The semantics of this variable will depend on the decoding of the input data to the Arrow representation (e.g., for tf.Example, each key is decoded to a separate column). Thus, structured data can be readily accessed by iterating/unnesting the fields of the "example" variable. Example 1: Slice on each value of a feature "SELECT STRUCT(gender) FROM example.gender" Example 2: Slice on each value of one feature and a specified value of another. "SELECT STRUCT(gender, country) FROM example.gender, example.country WHERE country = 'USA'" Only one of experimental_slice_functions or experimental_slice_sqls must be specified.

TYPE: Optional[List[Text]] DEFAULT: None

experimental_result_partitions

The number of feature partitions to combine output DatasetFeatureStatisticsLists into. If set to 1 (default) output is globally combined. If set to value greater than one, up to that many shards are returned, each containing a subset of features.

TYPE: int DEFAULT: 1

experimental_num_feature_partitions

If > 1, partitions computations by supported generators to act on this many bundles of features. For best results this should be set to at least several times less than the number of features in a dataset, and never more than the available beam parallelism.

TYPE: int DEFAULT: 1

slicing_config

an optional SlicingConfig. SlicingConfig includes slicing_specs specified with feature keys, feature values or slicing SQL queries.

TYPE: Optional[SlicingConfig] DEFAULT: None

experimental_filter_read_paths

If provided, tries to push down either paths passed via feature_allowlist or via the schema (in that priority) to the underlying read operation. Support depends on the file reader.

TYPE: bool DEFAULT: False

per_feature_stats_config

Supports granular control of what statistics are enabled per feature. Experimental.

TYPE: Optional[PerFeatureStatsConfig] DEFAULT: None

Source code in tensorflow_data_validation/statistics/stats_options.py
def __init__(
    self,
    generators: Optional[List[stats_generator.StatsGenerator]] = None,
    schema: Optional[schema_pb2.Schema] = None,
    label_feature: Optional[types.FeatureName] = None,
    weight_feature: Optional[types.FeatureName] = None,
    slice_functions: Optional[List[types.SliceFunction]] = None,
    sample_rate: Optional[float] = None,
    num_top_values: int = 20,
    frequency_threshold: int = 1,
    weighted_frequency_threshold: float = 1.0,
    num_rank_histogram_buckets: int = 1000,
    num_values_histogram_buckets: int = 10,
    num_histogram_buckets: int = 10,
    num_quantiles_histogram_buckets: int = 10,
    epsilon: float = 0.01,
    infer_type_from_schema: bool = False,
    desired_batch_size: Optional[int] = None,
    enable_semantic_domain_stats: bool = False,
    semantic_domain_stats_sample_rate: Optional[float] = None,
    per_feature_weight_override: Optional[
        Dict[types.FeaturePath, types.FeatureName]
    ] = None,
    vocab_paths: Optional[Dict[types.VocabName, types.VocabPath]] = None,
    add_default_generators: bool = True,
    # TODO(b/255895499): Support "from schema" for feature_allowlist.
    feature_allowlist: Optional[
        Union[List[types.FeatureName], List[types.FeaturePath]]
    ] = None,
    experimental_use_sketch_based_topk_uniques: Optional[bool] = None,
    use_sketch_based_topk_uniques: Optional[bool] = None,
    experimental_slice_functions: Optional[List[types.SliceFunction]] = None,
    experimental_slice_sqls: Optional[List[Text]] = None,
    experimental_result_partitions: int = 1,
    experimental_num_feature_partitions: int = 1,
    slicing_config: Optional[slicing_spec_pb2.SlicingConfig] = None,
    experimental_filter_read_paths: bool = False,
    per_feature_stats_config: Optional[types.PerFeatureStatsConfig] = None,
):
  """Initializes statistics options.

  Args:
    generators: An optional list of statistics generators. A statistics
      generator must extend either CombinerStatsGenerator or
      TransformStatsGenerator.
    schema: An optional tensorflow_metadata Schema proto. Currently we use the
      schema to infer categorical and bytes features.
    label_feature: An optional feature name which represents the label.
    weight_feature: An optional feature name whose numeric value represents
      the weight of an example.
    slice_functions: DEPRECATED. Use `experimental_slice_functions`.
    sample_rate: An optional sampling rate. If specified, statistics is
      computed over the sample.
    num_top_values: An optional number of most frequent feature values to keep
      for string features.
    frequency_threshold: An optional minimum number of examples the most
      frequent values must be present in.
    weighted_frequency_threshold: An optional minimum weighted number of
      examples the most frequent weighted values must be present in. This
      option is only relevant when a weight_feature is specified.
    num_rank_histogram_buckets: An optional number of buckets in the rank
      histogram for string features.
    num_values_histogram_buckets: An optional number of buckets in a quantiles
      histogram for the number of values per Feature, which is stored in
      CommonStatistics.num_values_histogram.
    num_histogram_buckets: An optional number of buckets in a standard
      NumericStatistics.histogram with equal-width buckets.
    num_quantiles_histogram_buckets: An optional number of buckets in a
      quantiles NumericStatistics.histogram.
    epsilon: An optional error tolerance for the computation of quantiles,
      typically a small fraction close to zero (e.g. 0.01). Higher values of
      epsilon increase the quantile approximation, and hence result in more
      unequal buckets, but could improve performance, and resource
      consumption.
    infer_type_from_schema: A boolean to indicate whether the feature types
      should be inferred from the schema. If set to True, an input schema must
      be provided. This flag is used only when invoking TFDV through
      `tfdv.generate_statistics_from_csv`.
    desired_batch_size: An optional maximum number of examples to include in
      each batch that is passed to the statistics generators. When invoking
      TFDV using its end-to-end APIs (e.g.
      `generate_statistics_from_tfrecord`), this option also controls the
      decoder batch size -- if provided, the decoded RecordBatches that are to
      be fed to TFDV will have the fixed batch size. When invoking TFDV using
      `tfdv.GenerateStatistics`, this option only controls the maximum size of
      RecordBatches constructed within StatsGenerators (a generator may
      combine RecordBatches).
    enable_semantic_domain_stats: If True statistics for semantic domains are
      generated (e.g: image, text domains).
    semantic_domain_stats_sample_rate: An optional sampling rate for semantic
      domain statistics. If specified, semantic domain statistics is computed
      over a sample.
    per_feature_weight_override: If specified, the "example weight" paired
      with a feature will be first looked up in this map and if not found,
      fall back to `weight_feature`.
    vocab_paths: An optional dictionary mapping vocab names to paths. Used in
      the schema when specifying a NaturalLanguageDomain. The paths can either
      be to GZIP-compressed TF record files that have a tfrecord.gz suffix or
      to text files.
    add_default_generators: Whether to invoke the default set of stats
      generators in the run. Generators invoked consists of 1) the default
      generators (controlled by this option); 2) user-provided generators (
      controlled by the `generators` option); 3) semantic generators
      (controlled by `enable_semantic_domain_stats`) and 4) schema-based
      generators that are enabled based on information provided in the schema.
    feature_allowlist: An optional list of names of the features to calculate
      statistics for, or a list of paths.
    experimental_use_sketch_based_topk_uniques: Deprecated, prefer
      use_sketch_based_topk_uniques.
    use_sketch_based_topk_uniques: if True, use the sketch based top-k and
      uniques stats generator.
    experimental_slice_functions: An optional list of functions that generate
      slice keys for each example. Each slice function should take
      pyarrow.RecordBatch as input and return an Iterable[Tuple[Text,
      pyarrow.RecordBatch]]. Each tuple contains the slice key and the
      corresponding sliced RecordBatch. Only one of
      experimental_slice_functions or experimental_slice_sqls must be
      specified.
    experimental_slice_sqls: List of slicing SQL queries. The query must have
      the following pattern: "SELECT STRUCT({feature_name} [AS {slice_key}])
      [FROM example.feature_name [, example.feature_name, ... ] [WHERE ... ]]"
      The “example.feature_name” inside the FROM statement is used to flatten
      the repeated fields. For non-repeated fields, you can directly write the
      query as follows: “SELECT STRUCT(non_repeated_feature_a,
      non_repeated_feature_b)” In the query, the “example” is a key word that
      binds to each input "row". The semantics of this variable will depend on
      the decoding of the input data to the Arrow representation (e.g., for
      tf.Example, each key is decoded to a separate column). Thus, structured
      data can be readily accessed by iterating/unnesting the fields of the
      "example" variable. Example 1: Slice on each value of a feature "SELECT
      STRUCT(gender) FROM example.gender" Example 2: Slice on each value of
      one feature and a specified value of another. "SELECT STRUCT(gender,
      country) FROM example.gender, example.country WHERE country = 'USA'"
      Only one of experimental_slice_functions or experimental_slice_sqls must
      be specified.
    experimental_result_partitions: The number of feature partitions to
      combine output DatasetFeatureStatisticsLists into. If set to 1 (default)
      output is globally combined. If set to value greater than one, up to
      that many shards are returned, each containing a subset of features.
    experimental_num_feature_partitions: If > 1, partitions computations by
      supported generators to act on this many bundles of features. For best
      results this should be set to at least several times less than the
      number of features in a dataset, and never more than the available beam
      parallelism.
    slicing_config: an optional SlicingConfig. SlicingConfig includes
      slicing_specs specified with feature keys, feature values or slicing SQL
      queries.
    experimental_filter_read_paths: If provided, tries to push down either
      paths passed via feature_allowlist or via the schema (in that priority)
      to the underlying read operation. Support depends on the file reader.
    per_feature_stats_config: Supports granular control of what statistics are
      enabled per feature. Experimental.
  """
  self.generators = generators
  self.feature_allowlist = feature_allowlist
  self.schema = schema
  self.label_feature = label_feature
  self.weight_feature = weight_feature
  if slice_functions is not None and experimental_slice_functions is not None:
    raise ValueError(
        'Specify only one of slice_functions or experimental_slice_functions')
  self.experimental_slice_functions = None
  if slice_functions is not None:
    self.experimental_slice_functions = slice_functions
  elif experimental_slice_functions is not None:
    self.experimental_slice_functions = experimental_slice_functions
  self.sample_rate = sample_rate
  self.num_top_values = num_top_values
  self.frequency_threshold = frequency_threshold
  self.weighted_frequency_threshold = weighted_frequency_threshold
  self.num_rank_histogram_buckets = num_rank_histogram_buckets
  self.num_values_histogram_buckets = num_values_histogram_buckets
  self.num_histogram_buckets = num_histogram_buckets
  self.num_quantiles_histogram_buckets = num_quantiles_histogram_buckets
  self.epsilon = epsilon
  self.infer_type_from_schema = infer_type_from_schema
  self.desired_batch_size = desired_batch_size
  self.enable_semantic_domain_stats = enable_semantic_domain_stats
  self.semantic_domain_stats_sample_rate = semantic_domain_stats_sample_rate
  self._per_feature_weight_override = per_feature_weight_override
  self.vocab_paths = vocab_paths
  self.add_default_generators = add_default_generators
  if (use_sketch_based_topk_uniques is not None and
      experimental_use_sketch_based_topk_uniques is not None):
    raise ValueError(
        'Must set at most one of use_sketch_based_topk_uniques and'
        ' experimental_use_sketch_based_topk_uniques')
  # TODO(b/239609486): Change the None default to True.
  if (
      experimental_use_sketch_based_topk_uniques
      or use_sketch_based_topk_uniques
  ):
    self.use_sketch_based_topk_uniques = True
  else:
    self.use_sketch_based_topk_uniques = False
  self.experimental_slice_sqls = experimental_slice_sqls
  self.experimental_num_feature_partitions = (
      experimental_num_feature_partitions
  )
  self.experimental_result_partitions = experimental_result_partitions
  self.slicing_config = slicing_config
  self.experimental_filter_read_paths = experimental_filter_read_paths
  self.per_feature_stats_config = per_feature_stats_config
Attributes
add_default_generators property writable
add_default_generators: bool
desired_batch_size property writable
desired_batch_size: Optional[int]
enable_semantic_domain_stats instance-attribute
enable_semantic_domain_stats = enable_semantic_domain_stats
epsilon instance-attribute
epsilon = epsilon
example_weight_map property
example_weight_map
experimental_filter_read_paths property writable
experimental_filter_read_paths: bool
experimental_num_feature_partitions property writable
experimental_num_feature_partitions: int
experimental_result_partitions property writable
experimental_result_partitions: int
experimental_slice_functions property writable
experimental_slice_functions: Optional[List[SliceFunction]]
experimental_slice_sqls property writable
experimental_slice_sqls: Optional[List[Text]]
experimental_use_sketch_based_topk_uniques property writable
experimental_use_sketch_based_topk_uniques: bool
feature_allowlist property writable
feature_allowlist: Optional[
    Union[List[FeatureName], List[FeaturePath]]
]
frequency_threshold instance-attribute
frequency_threshold = frequency_threshold
generators property writable
generators: Optional[List[StatsGenerator]]
infer_type_from_schema instance-attribute
infer_type_from_schema = infer_type_from_schema
label_feature instance-attribute
label_feature = label_feature
num_histogram_buckets property writable
num_histogram_buckets: int
num_quantiles_histogram_buckets property writable
num_quantiles_histogram_buckets: int
num_rank_histogram_buckets instance-attribute
num_rank_histogram_buckets = num_rank_histogram_buckets
num_top_values instance-attribute
num_top_values = num_top_values
num_values_histogram_buckets property writable
num_values_histogram_buckets: int
per_feature_stats_config property writable
per_feature_stats_config: PerFeatureStatsConfig
sample_rate property writable
sample_rate: Optional[float]
schema property writable
schema: Optional[Schema]
semantic_domain_stats_sample_rate property writable
semantic_domain_stats_sample_rate: Optional[float]
slicing_config property writable
slicing_config: Optional[SlicingConfig]
use_sketch_based_topk_uniques property writable
use_sketch_based_topk_uniques: bool
vocab_paths property writable
vocab_paths: Optional[Dict[VocabName, VocabPath]]
weight_feature instance-attribute
weight_feature = weight_feature
weighted_frequency_threshold instance-attribute
weighted_frequency_threshold = weighted_frequency_threshold
Functions
from_json classmethod
from_json(options_json: Text) -> StatsOptions

Construct an instance of stats options from a JSON representation.

PARAMETER DESCRIPTION
options_json

A JSON representation of the dict attribute of a StatsOptions instance.

TYPE: Text

RETURNS DESCRIPTION
StatsOptions

A StatsOptions instance constructed by setting the dict attribute to

StatsOptions

the deserialized value of options_json.

Source code in tensorflow_data_validation/statistics/stats_options.py
@classmethod
def from_json(cls, options_json: Text) -> 'StatsOptions':
  """Construct an instance of stats options from a JSON representation.

  Args:
    options_json: A JSON representation of the __dict__ attribute of a
      StatsOptions instance.

  Returns:
    A StatsOptions instance constructed by setting the __dict__ attribute to
    the deserialized value of options_json.
  """
  options_dict = json.loads(options_json)
  type_name = options_dict.pop(_TYPE_NAME_KEY, None)
  if type_name is not None and type_name != 'StatsOptions':
    raise ValueError('JSON does not encode a StatsOptions')
  if _SCHEMA_JSON_KEY in options_dict:
    options_dict['_schema'] = json_format.Parse(
        options_dict[_SCHEMA_JSON_KEY], schema_pb2.Schema())
    del options_dict[_SCHEMA_JSON_KEY]
  if _SLICING_CONFIG_JSON_KEY in options_dict:
    options_dict['_slicing_config'] = json_format.Parse(
        options_dict[_SLICING_CONFIG_JSON_KEY],
        slicing_spec_pb2.SlicingConfig())
    del options_dict[_SLICING_CONFIG_JSON_KEY]
  per_feature_weight_override_json = options_dict.get(
      _PER_FEATURE_WEIGHT_OVERRIDE_JSON_KEY)
  if per_feature_weight_override_json is not None:
    options_dict['_per_feature_weight_override'] = {
        types.FeaturePath.from_json(k): v
        for k, v in per_feature_weight_override_json.items()
    }
    del options_dict[_PER_FEATURE_WEIGHT_OVERRIDE_JSON_KEY]
  options = cls()
  options.__dict__ = options_dict
  return options
to_json
to_json() -> Text

Convert from an object to JSON representation of the dict attribute.

Custom generators and slice_functions cannot being converted. As a result, a ValueError will be raised when these options are specified and TFDV is running in a setting where the stats options have been json-serialized, first. This will happen in the case where TFDV is run as a TFX component. The schema proto and slicing_config will be json_encoded.

RETURNS DESCRIPTION
Text

A JSON representation of a filtered version of dict.

Source code in tensorflow_data_validation/statistics/stats_options.py
def to_json(self) -> Text:
  """Convert from an object to JSON representation of the __dict__ attribute.

  Custom generators and slice_functions cannot being converted. As a result,
  a ValueError will be raised when these options are specified and TFDV is
  running in a setting where the stats options have been json-serialized,
  first. This will happen in the case where TFDV is run as a TFX component.
  The schema proto and slicing_config will be json_encoded.

  Returns:
    A JSON representation of a filtered version of __dict__.
  """
  options_dict = copy.copy(self.__dict__)
  options_dict[_TYPE_NAME_KEY] = 'StatsOptions'
  if options_dict['_slice_functions'] is not None:
    raise ValueError(
        'StatsOptions cannot be converted with experimental_slice_functions.'
    )
  if options_dict['_generators'] is not None:
    raise ValueError(
        'StatsOptions cannot be converted with generators.'
    )
  if self.schema is not None:
    del options_dict['_schema']
    options_dict[_SCHEMA_JSON_KEY] = json_format.MessageToJson(self.schema)
  if self.slicing_config is not None:
    del options_dict['_slicing_config']
    options_dict[_SLICING_CONFIG_JSON_KEY] = json_format.MessageToJson(
        self.slicing_config)
  if self._per_feature_weight_override is not None:
    del options_dict['_per_feature_weight_override']
    options_dict[_PER_FEATURE_WEIGHT_OVERRIDE_JSON_KEY] = {
        k.to_json(): v for k, v in self._per_feature_weight_override.items()
    }
  if self._per_feature_stats_config is not None:
    raise ValueError(
        'StatsOptions cannot be converted with per_feature_stats_config.'
    )
  return json.dumps(options_dict)

TransformStatsGenerator

TransformStatsGenerator(
    name: Text,
    ptransform: PTransform,
    schema: Optional[Schema] = None,
)

Bases: StatsGenerator

A StatsGenerator which wraps an arbitrary Beam PTransform.

This class computes statistics using a user-provided Beam PTransform. The PTransform must accept a Beam PCollection where each element is a tuple containing a slice key and an Arrow RecordBatch representing a batch of examples. It must return a PCollection where each element is a tuple containing a slice key and a DatasetFeatureStatistics proto representing the statistics of a slice.

Initializes a statistics generator.

PARAMETER DESCRIPTION
name

A unique name associated with the statistics generator.

TYPE: Text

schema

An optional schema for the dataset.

TYPE: Optional[Schema] DEFAULT: None

Source code in tensorflow_data_validation/statistics/generators/stats_generator.py
def __init__(self,
             name: Text,
             ptransform: beam.PTransform,
             schema: Optional[schema_pb2.Schema] = None) -> None:
  self._ptransform = ptransform
  super(TransformStatsGenerator, self).__init__(name, schema)
Attributes
name property
name
ptransform property
ptransform
schema property
schema
Functions

WriteStatisticsToBinaryFile

WriteStatisticsToBinaryFile(output_path: Text)

Bases: PTransform

API for writing serialized data statistics to a binary file.

Initializes the transform.

PARAMETER DESCRIPTION
output_path

Output path for writing data statistics.

TYPE: Text

Source code in tensorflow_data_validation/api/stats_api.py
def __init__(self, output_path: Text) -> None:
  """Initializes the transform.

  Args:
    output_path: Output path for writing data statistics.
  """
  self._output_path = output_path
Functions
expand
expand(stats: PCollection) -> PDone
Source code in tensorflow_data_validation/api/stats_api.py
def expand(self, stats: beam.PCollection) -> beam.pvalue.PDone:
  return (stats
          | 'WriteStats' >> beam.io.WriteToText(
              self._output_path,
              shard_name_template='',
              append_trailing_newlines=False,
              coder=beam.coders.ProtoCoder(
                  statistics_pb2.DatasetFeatureStatisticsList)))

WriteStatisticsToRecordsAndBinaryFile

WriteStatisticsToRecordsAndBinaryFile(
    binary_proto_path: str,
    records_path_prefix: str,
    columnar_path_prefix: Optional[str] = None,
)

Bases: PTransform

API for writing statistics to both sharded records and binary pb.

This PTransform assumes that input represents sharded statistics, which are written directly. These statistics are also merged and written to a binary proto.

Currently Experimental.

TODO(b/202910677): After full migration to sharded stats, clean this up.

Initializes the transform.

PARAMETER DESCRIPTION
binary_proto_path

Output path for writing statistics as a binary proto.

TYPE: str

records_path_prefix

File pattern for writing statistics to sharded records.

TYPE: str

columnar_path_prefix

Optional file pattern for writing statistics to columnar outputs. If provided, columnar outputs will be written when supported.

TYPE: Optional[str] DEFAULT: None

Source code in tensorflow_data_validation/api/stats_api.py
def __init__(
    self,
    binary_proto_path: str,
    records_path_prefix: str,
    columnar_path_prefix: Optional[str] = None,
) -> None:
  """Initializes the transform.

  Args:
    binary_proto_path: Output path for writing statistics as a binary proto.
    records_path_prefix: File pattern for writing statistics to sharded
      records.
    columnar_path_prefix: Optional file pattern for writing statistics to
      columnar outputs. If provided, columnar outputs will be written when
      supported.
  """
  self._binary_proto_path = binary_proto_path
  self._records_path_prefix = records_path_prefix
  self._io_provider = artifacts_io_impl.get_io_provider()
  self._columnar_path_prefix = columnar_path_prefix
Functions
expand
expand(stats: PCollection) -> PDone
Source code in tensorflow_data_validation/api/stats_api.py
def expand(self, stats: beam.PCollection) -> beam.pvalue.PDone:
  # Write sharded outputs, ignoring PDone.
  _ = (
      stats | 'WriteShardedStats' >> self._io_provider.record_sink_impl(
          output_path_prefix=self._records_path_prefix))
  if self._columnar_path_prefix is not None:
    columnar_provider = artifacts_io_impl.get_default_columnar_provider()
    if columnar_provider is not None:
      _ = (
          stats | 'WriteColumnarStats' >> columnar_provider.record_sink_impl(
              self._columnar_path_prefix))
  return (stats
          | 'MergeDatasetFeatureStatisticsProtos' >> beam.CombineGlobally(
              merge_util.merge_dataset_feature_statistics_list)
          | 'WriteBinaryStats' >> WriteStatisticsToBinaryFile(
              self._binary_proto_path))

WriteStatisticsToTFRecord

WriteStatisticsToTFRecord(
    output_path: Text, sharded_output=False
)

Bases: PTransform

API for writing serialized data statistics to TFRecord file.

Initializes the transform.

PARAMETER DESCRIPTION
output_path

The output path or path prefix (if sharded_output=True).

TYPE: Text

sharded_output

If true, writes sharded TFRecords files in the form output_path-SSSSS-of-NNNNN.

DEFAULT: False

Source code in tensorflow_data_validation/api/stats_api.py
def __init__(self, output_path: Text, sharded_output=False) -> None:
  """Initializes the transform.

  Args:
    output_path: The output path or path prefix (if sharded_output=True).
    sharded_output: If true, writes sharded TFRecords files in the form
      output_path-SSSSS-of-NNNNN.
  """
  self._output_path = output_path
  self._sharded_output = sharded_output
Functions
expand
expand(stats: PCollection) -> PDone
Source code in tensorflow_data_validation/api/stats_api.py
def expand(self, stats: beam.PCollection) -> beam.pvalue.PDone:
  return (stats
          | 'WriteStats' >> beam.io.WriteToTFRecord(
              self._output_path,
              shard_name_template='' if not self._sharded_output else None,
              coder=beam.coders.ProtoCoder(
                  statistics_pb2.DatasetFeatureStatisticsList)))

Functions

compare_slices

compare_slices(
    statistics: DatasetFeatureStatisticsList,
    lhs_slice_key: Text,
    rhs_slice_key: Text,
)

Compare statistics of two slices using Facets.

PARAMETER DESCRIPTION
statistics

A DatasetFeatureStatisticsList protocol buffer.

TYPE: DatasetFeatureStatisticsList

lhs_slice_key

Slice key of the first slice.

TYPE: Text

rhs_slice_key

Slice key of the second slice.

TYPE: Text

RAISES DESCRIPTION
ValueError

If the input statistics proto does not have the specified slice statistics.

Source code in tensorflow_data_validation/utils/display_util.py
def compare_slices(
    statistics: statistics_pb2.DatasetFeatureStatisticsList,
    lhs_slice_key: Text,
    rhs_slice_key: Text,
):
  """Compare statistics of two slices using Facets.

  Args:
    statistics: A DatasetFeatureStatisticsList protocol buffer.
    lhs_slice_key: Slice key of the first slice.
    rhs_slice_key: Slice key of the second slice.

  Raises:
    ValueError: If the input statistics proto does not have the specified slice
      statistics.
  """
  lhs_stats = stats_util.get_slice_stats(statistics, lhs_slice_key)
  rhs_stats = stats_util.get_slice_stats(statistics, rhs_slice_key)
  visualize_statistics(
      lhs_stats, rhs_stats, lhs_name=lhs_slice_key, rhs_name=rhs_slice_key
  )

default_sharded_output_suffix

default_sharded_output_suffix() -> str

Returns the default sharded output suffix.

Source code in tensorflow_data_validation/api/stats_api.py
def default_sharded_output_suffix() -> str:
  """Returns the default sharded output suffix."""
  return artifacts_io_impl.get_io_provider().file_suffix()

default_sharded_output_supported

default_sharded_output_supported() -> bool

True if sharded output is supported by default.

Source code in tensorflow_data_validation/api/stats_api.py
def default_sharded_output_supported() -> bool:
  """True if sharded output is supported by default."""
  return artifacts_io_impl.should_write_sharded()

display_anomalies

display_anomalies(anomalies: Anomalies) -> None

Displays the input anomalies (for use in a Jupyter notebook).

PARAMETER DESCRIPTION
anomalies

An Anomalies protocol buffer.

TYPE: Anomalies

Source code in tensorflow_data_validation/utils/display_util.py
def display_anomalies(anomalies: anomalies_pb2.Anomalies) -> None:
  """Displays the input anomalies (for use in a Jupyter notebook).

  Args:
    anomalies: An Anomalies protocol buffer.
  """
  anomalies_df = get_anomalies_dataframe(anomalies)
  if anomalies_df.empty:
    display(HTML('<h4 style="color:green;">No anomalies found.</h4>'))
  else:
    display(anomalies_df)

display_schema

display_schema(schema: Schema) -> None

Displays the input schema (for use in a Jupyter notebook).

PARAMETER DESCRIPTION
schema

A Schema protocol buffer.

TYPE: Schema

Source code in tensorflow_data_validation/utils/display_util.py
def display_schema(schema: schema_pb2.Schema) -> None:
  """Displays the input schema (for use in a Jupyter notebook).

  Args:
    schema: A Schema protocol buffer.
  """
  features_df, domains_df = get_schema_dataframe(schema)
  display(features_df)
  # Do not truncate columns.
  if not domains_df.empty:
    pd.set_option('display.max_colwidth', None)
    display(domains_df)

experimental_get_feature_value_slicer

experimental_get_feature_value_slicer(
    features: Dict[FeatureName, Optional[_ValueType]],
) -> SliceFunction

Returns a function that generates sliced record batches for a given one.

The returned function returns sliced record batches based on the combination of all features specified in features. To slice on features separately ( e.g., slice on age feature and separately slice on interests feature), you must use separate slice functions.

Examples:

Slice on each value of the specified features.

slice_fn = get_feature_value_slicer( features={'age': None, 'interests': None})

Slice on a specified feature value.

slice_fn = get_feature_value_slicer(features={'interests': ['dogs']})

Slice on each value of one feature and a specified value of another.

slice_fn = get_feature_value_slicer( features={'fruits': None, 'numbers': [1]})

PARAMETER DESCRIPTION
features

A mapping of features to an optional iterable of values that the returned function will slice on. If values is None for a feature, then the slice keys will reflect each distinct value found for that feature in the input record batch. If values are specified for a feature, then the slice keys will reflect only those values for the feature, if found in the input record batch. Values must be an iterable of strings or integers.

TYPE: Dict[FeatureName, Optional[_ValueType]]

RETURNS DESCRIPTION
SliceFunction

A function that takes as input a single record batch and returns a list of

SliceFunction

sliced record batches (slice_key, record_batch).

RAISES DESCRIPTION
TypeError

If feature values are not specified in an iterable.

NotImplementedError

If a value of a type other than string or integer is specified in the values iterable in features.

Source code in tensorflow_data_validation/utils/slicing_util.py
def get_feature_value_slicer(
    features: Dict[types.FeatureName, Optional[_ValueType]]
) -> types.SliceFunction:
  """Returns a function that generates sliced record batches for a given one.

  The returned function returns sliced record batches based on the combination
  of all features specified in `features`. To slice on features separately (
  e.g., slice on age feature and separately slice on interests feature), you
  must use separate slice functions.

  Examples:
  # Slice on each value of the specified features.
  slice_fn = get_feature_value_slicer(
      features={'age': None, 'interests': None})

  # Slice on a specified feature value.
  slice_fn = get_feature_value_slicer(features={'interests': ['dogs']})

  # Slice on each value of one feature and a specified value of another.
  slice_fn = get_feature_value_slicer(
      features={'fruits': None, 'numbers': [1]})

  Args:
    features: A mapping of features to an optional iterable of values that the
      returned function will slice on. If values is None for a feature, then the
      slice keys will reflect each distinct value found for that feature in the
      input record batch. If values are specified for a feature, then the slice
      keys will reflect only those values for the feature, if found in the input
      record batch. Values must be an iterable of strings or integers.

  Returns:
    A function that takes as input a single record batch and returns a list of
    sliced record batches (slice_key, record_batch).

  Raises:
    TypeError: If feature values are not specified in an iterable.
    NotImplementedError: If a value of a type other than string or integer is
      specified in the values iterable in `features`.
  """
  for values in features.values():
    if values is not None:
      if not isinstance(values, abc.Iterable):
        raise TypeError('Feature values must be specified in an iterable.')
      for value in values:
        if (not isinstance(value, (six.string_types, six.binary_type)) and
            not isinstance(value, int)):
          raise NotImplementedError(
              'Only string and int values are supported as the slice value.')
  # Extract the unique slice values per feature.
  for feature_name in features:
    if features[feature_name] is not None:
      features[feature_name] = set(features[feature_name])

  def feature_value_slicer(record_batch: pa.RecordBatch) -> Iterable[
      types.SlicedRecordBatch]:
    """A function that generates sliced record batches.

    The naive approach of doing this would be to iterate each row, identify
    slice keys for the row and keep track of index ranges for each slice key.
    And then generate an arrow record batch for each slice key based on the
    index ranges. This would be expensive as we are identifying the slice keys
    for each row individually and we would have to loop over the feature values
    including crossing them when we have to slice on multiple features. The
    current approach generates the slice keys for a batch by performing joins
    over indices of individual features. And then groups the joined record batch
    by slice key to get the row indices corresponding to a slice.

    Args:
      record_batch: Arrow RecordBatch.

    Yields:
      Sliced record batch (slice_key, record_batch) where record_batch contains
      the rows corresponding to a slice.
    """
    per_feature_parent_indices = []
    for feature_name, values in six.iteritems(features):
      feature_array = arrow_util.get_column(
          record_batch, feature_name, missing_ok=True)
      # If the feature name does not appear in the schema for this record batch,
      # drop it from the set of sliced features.
      if feature_array is None:
        continue

      # convert values from list[str] to list[int] if the feature type
      # is integer.
      if values is not None:
        feature_type = stats_util.get_feature_type_from_arrow_type(
            types.FeaturePath([feature_name]), feature_array.type)
        if feature_type == statistics_pb2.FeatureNameStatistics.INT:
          try:
            values = [int(value) for value in values]
          except ValueError as e:
            raise ValueError(
                'The feature to slice on has integer values but '
                'the provided slice values are not valid integers.') from e

      flattened, value_parent_indices = array_util.flatten_nested(
          feature_array, True)
      non_missing_values = np.asarray(flattened)
      # Create dataframe with feature value and parent index.
      df = pd.DataFrame({
          feature_name: non_missing_values,
          _PARENT_INDEX_COLUMN: value_parent_indices
      })
      df.drop_duplicates(inplace=True)
      # Filter based on slice values
      if values is not None:
        df = df.loc[df[feature_name].isin(values)]
      per_feature_parent_indices.append(df)
    # If there are no features to slice on, yield no output.
    # TODO(b/200081813): Produce output with an appropriate placeholder key.
    if not per_feature_parent_indices:
      return
    # Join dataframes based on parent indices.
    # Note that we want the parent indices per slice key to be sorted in the
    # merged dataframe. The individual dataframes have the parent indices in
    # sorted order. We use "inner" join type to preserve the order of the left
    # keys (also note that same parent index rows would be consecutive). Hence
    # we expect the merged dataframe to have sorted parent indices per
    # slice key.
    merged_df = functools.reduce(
        lambda base, update: pd.merge(base, update, how='inner',  # pylint: disable=g-long-lambda
                                      on=_PARENT_INDEX_COLUMN),
        per_feature_parent_indices)

    # Construct a new column in the merged dataframe with the slice keys.
    merged_df[_SLICE_KEY_COLUMN] = ''
    index = 0
    for col_name in sorted(merged_df.columns):
      if col_name in [_PARENT_INDEX_COLUMN, _SLICE_KEY_COLUMN]:
        continue
      feature_value_part = merged_df[col_name].apply(_to_slice_key)
      if feature_value_part.empty:
        feature_value_part = feature_value_part.astype(pd.StringDtype())
      slice_key_col = _to_slice_key(col_name) + '_' + feature_value_part
      if index == 0:
        merged_df[_SLICE_KEY_COLUMN] = slice_key_col
        index += 1
      else:
        merged_df[_SLICE_KEY_COLUMN] += ('_' + slice_key_col)

    # Since the parent indices are sorted per slice key, the groupby would
    # preserve the sorted order within each group.
    per_slice_parent_indices = merged_df.groupby(
        _SLICE_KEY_COLUMN, sort=False)[_PARENT_INDEX_COLUMN]
    for slice_key, parent_indices in per_slice_parent_indices:
      yield (slice_key,
             table_util.RecordBatchTake(record_batch,
                                        pa.array(parent_indices.to_numpy())))

  return feature_value_slicer

generate_dummy_schema_with_paths

generate_dummy_schema_with_paths(
    paths: List[FeaturePath],
) -> Schema

Generate a schema with the requested paths and no other information.

Source code in tensorflow_data_validation/utils/schema_util.py
def generate_dummy_schema_with_paths(
    paths: List[types.FeaturePath]) -> schema_pb2.Schema:
  """Generate a schema with the requested paths and no other information."""
  schema = schema_pb2.Schema()
  tree = _paths_to_tree(paths)

  def _add(container, name, children):
    container.feature.add(name=name)
    if children:
      for child_name, grandchildren in children.items():
        _add(container.feature[-1].struct_domain, child_name, grandchildren)

  for name, children in tree.items():
    _add(schema, name, children)
  return schema

generate_statistics_from_csv

generate_statistics_from_csv(
    data_location: Text,
    column_names: Optional[List[FeatureName]] = None,
    delimiter: Text = ",",
    output_path: Optional[bytes] = None,
    stats_options: StatsOptions = StatsOptions(),
    pipeline_options: Optional[PipelineOptions] = None,
    compression_type: Text = AUTO,
) -> DatasetFeatureStatisticsList

Compute data statistics from CSV files.

Runs a Beam pipeline to compute the data statistics and return the result data statistics proto.

This is a convenience method for users with data in CSV format. Users with data in unsupported file/data formats, or users who wish to create their own Beam pipelines need to use the 'GenerateStatistics' PTransform API directly instead.

PARAMETER DESCRIPTION
data_location

The location of the input data files.

TYPE: Text

column_names

A list of column names to be treated as the CSV header. Order must match the order in the input CSV files. If this argument is not specified, we assume the first line in the input CSV files as the header. Note that this option is valid only for 'csv' input file format.

TYPE: Optional[List[FeatureName]] DEFAULT: None

delimiter

A one-character string used to separate fields in a CSV file.

TYPE: Text DEFAULT: ','

output_path

The file path to output data statistics result to. If None, we use a temporary directory. It will be a TFRecord file containing a single data statistics proto, and can be read with the 'load_statistics' API. If you run this function on Google Cloud, you must specify an output_path. Specifying None may cause an error.

TYPE: Optional[bytes] DEFAULT: None

stats_options

tfdv.StatsOptions for generating data statistics.

TYPE: StatsOptions DEFAULT: StatsOptions()

pipeline_options

Optional beam pipeline options. This allows users to specify various beam pipeline execution parameters like pipeline runner (DirectRunner or DataflowRunner), cloud dataflow service project id, etc. See https://cloud.google.com/dataflow/pipelines/specifying-exec-params for more details.

TYPE: Optional[PipelineOptions] DEFAULT: None

compression_type

Used to handle compressed input files. Default value is CompressionTypes.AUTO, in which case the file_path's extension will be used to detect the compression.

TYPE: Text DEFAULT: AUTO

RETURNS DESCRIPTION
DatasetFeatureStatisticsList

A DatasetFeatureStatisticsList proto.

Source code in tensorflow_data_validation/utils/stats_gen_lib.py
def generate_statistics_from_csv(
    data_location: Text,
    column_names: Optional[List[types.FeatureName]] = None,
    delimiter: Text = ',',
    output_path: Optional[bytes] = None,
    stats_options: options.StatsOptions = options.StatsOptions(),
    pipeline_options: Optional[PipelineOptions] = None,
    compression_type: Text = CompressionTypes.AUTO,
) -> statistics_pb2.DatasetFeatureStatisticsList:
  """Compute data statistics from CSV files.

  Runs a Beam pipeline to compute the data statistics and return the result
  data statistics proto.

  This is a convenience method for users with data in CSV format.
  Users with data in unsupported file/data formats, or users who wish
  to create their own Beam pipelines need to use the 'GenerateStatistics'
  PTransform API directly instead.

  Args:
    data_location: The location of the input data files.
    column_names: A list of column names to be treated as the CSV header. Order
      must match the order in the input CSV files. If this argument is not
      specified, we assume the first line in the input CSV files as the
      header. Note that this option is valid only for 'csv' input file format.
    delimiter: A one-character string used to separate fields in a CSV file.
    output_path: The file path to output data statistics result to. If None, we
      use a temporary directory. It will be a TFRecord file containing a single
      data statistics proto, and can be read with the 'load_statistics' API.
      If you run this function on Google Cloud, you must specify an
      output_path. Specifying None may cause an error.
    stats_options: `tfdv.StatsOptions` for generating data statistics.
    pipeline_options: Optional beam pipeline options. This allows users to
      specify various beam pipeline execution parameters like pipeline runner
      (DirectRunner or DataflowRunner), cloud dataflow service project id, etc.
      See https://cloud.google.com/dataflow/pipelines/specifying-exec-params for
      more details.
    compression_type: Used to handle compressed input files. Default value is
      CompressionTypes.AUTO, in which case the file_path's extension will be
      used to detect the compression.

  Returns:
    A DatasetFeatureStatisticsList proto.
  """
  if output_path is None:
    output_path = os.path.join(tempfile.mkdtemp(), 'data_stats.tfrecord')
  output_dir_path = os.path.dirname(output_path)
  if not tf.io.gfile.exists(output_dir_path):
    tf.io.gfile.makedirs(output_dir_path)

  batch_size = (
      stats_options.desired_batch_size if stats_options.desired_batch_size
      and stats_options.desired_batch_size > 0 else
      constants.DEFAULT_DESIRED_INPUT_BATCH_SIZE)
  # PyLint doesn't understand Beam PTransforms.
  # pylint: disable=no-value-for-parameter
  with beam.Pipeline(options=pipeline_options) as p:
    # If a header is not provided, assume the first line in a file
    # to be the header.
    skip_header_lines = 1 if column_names is None else 0
    if column_names is None:
      column_names = get_csv_header(data_location, delimiter, compression_type)
    _ = (
        p
        | 'ReadData' >> beam.io.textio.ReadFromText(
            file_pattern=data_location,
            skip_header_lines=skip_header_lines,
            compression_type=compression_type)
        | 'DecodeData' >> csv_decoder.DecodeCSV(
            column_names=column_names,
            delimiter=delimiter,
            schema=stats_options.schema
            if stats_options.infer_type_from_schema else None,
            desired_batch_size=batch_size)
        | 'GenerateStatistics' >> stats_api.GenerateStatistics(stats_options)
        | 'WriteStatsOutput' >> stats_api.WriteStatisticsToTFRecord(
            output_path))
  return stats_util.load_statistics(output_path)

generate_statistics_from_dataframe

generate_statistics_from_dataframe(
    dataframe: DataFrame,
    stats_options: StatsOptions = StatsOptions(),
    n_jobs: int = 1,
) -> DatasetFeatureStatisticsList

Compute data statistics for the input pandas DataFrame.

This is a utility function for users with in-memory data represented as a pandas DataFrame.

This function supports only DataFrames with columns of primitive string or numeric types. DataFrames with multivalent features or holding non-string object types are not supported.

PARAMETER DESCRIPTION
dataframe

Input pandas DataFrame.

TYPE: DataFrame

stats_options

tfdv.StatsOptions for generating data statistics.

TYPE: StatsOptions DEFAULT: StatsOptions()

n_jobs

Number of processes to run (defaults to 1). If -1 is provided, uses the same number of processes as the number of CPU cores.

TYPE: int DEFAULT: 1

RETURNS DESCRIPTION
DatasetFeatureStatisticsList

A DatasetFeatureStatisticsList proto.

Source code in tensorflow_data_validation/utils/stats_gen_lib.py
def generate_statistics_from_dataframe(
    dataframe: DataFrame,
    stats_options: options.StatsOptions = options.StatsOptions(),
    n_jobs: int = 1
) -> statistics_pb2.DatasetFeatureStatisticsList:
  """Compute data statistics for the input pandas DataFrame.

  This is a utility function for users with in-memory data represented
  as a pandas DataFrame.

  This function supports only DataFrames with columns of primitive string or
  numeric types. DataFrames with multivalent features or holding non-string
  object types are not supported.

  Args:
    dataframe: Input pandas DataFrame.
    stats_options: `tfdv.StatsOptions` for generating data statistics.
    n_jobs: Number of processes to run (defaults to 1). If -1 is provided,
      uses the same number of processes as the number of CPU cores.

  Returns:
    A DatasetFeatureStatisticsList proto.
  """
  if not isinstance(dataframe, DataFrame):
    raise TypeError('dataframe argument is of type {}. Must be a '
                    'pandas DataFrame.'.format(type(dataframe).__name__))

  stats_generators = cast(
      List[stats_generator.CombinerStatsGenerator],
      stats_impl.get_generators(stats_options, in_memory=True))
  if n_jobs < -1 or n_jobs == 0:
    raise ValueError('Invalid n_jobs parameter {}. Should be either '
                     ' -1 or >= 1.'.format(n_jobs))

  if n_jobs == -1:
    n_jobs = multiprocessing.cpu_count()
  n_jobs = max(min(n_jobs, multiprocessing.cpu_count()), 1)

  if n_jobs == 1:
    merged_partial_stats = _generate_partial_statistics_from_df(
        dataframe, stats_options, stats_generators)
  else:
    # TODO(b/144580609): Consider using Beam for inmemory mode as well.
    splits = np.array_split(dataframe, n_jobs)
    partial_stats = Parallel(n_jobs=n_jobs)(
        delayed(_generate_partial_statistics_from_df)(
            splits[i], stats_options, stats_generators) for i in range(n_jobs))
    merged_partial_stats = [
        gen.merge_accumulators(stats)
        for gen, stats in zip(stats_generators, zip(*partial_stats))
    ]
  return stats_impl.extract_statistics_output(
      merged_partial_stats, stats_generators)

generate_statistics_from_tfrecord

generate_statistics_from_tfrecord(
    data_location: Text,
    output_path: Optional[bytes] = None,
    stats_options: StatsOptions = StatsOptions(),
    pipeline_options: Optional[PipelineOptions] = None,
) -> DatasetFeatureStatisticsList

Compute data statistics from TFRecord files containing TFExamples.

Runs a Beam pipeline to compute the data statistics and return the result data statistics proto.

This is a convenience method for users with data in TFRecord format. Users with data in unsupported file/data formats, or users who wish to create their own Beam pipelines need to use the 'GenerateStatistics' PTransform API directly instead.

PARAMETER DESCRIPTION
data_location

The location of the input data files.

TYPE: Text

output_path

The file path to output data statistics result to. If None, we use a temporary directory. It will be a TFRecord file containing a single data statistics proto, and can be read with the 'load_statistics' API. If you run this function on Google Cloud, you must specify an output_path. Specifying None may cause an error.

TYPE: Optional[bytes] DEFAULT: None

stats_options

tfdv.StatsOptions for generating data statistics.

TYPE: StatsOptions DEFAULT: StatsOptions()

pipeline_options

Optional beam pipeline options. This allows users to specify various beam pipeline execution parameters like pipeline runner (DirectRunner or DataflowRunner), cloud dataflow service project id, etc. See https://cloud.google.com/dataflow/pipelines/specifying-exec-params for more details.

TYPE: Optional[PipelineOptions] DEFAULT: None

RETURNS DESCRIPTION
DatasetFeatureStatisticsList

A DatasetFeatureStatisticsList proto.

Source code in tensorflow_data_validation/utils/stats_gen_lib.py
def generate_statistics_from_tfrecord(
    data_location: Text,
    output_path: Optional[bytes] = None,
    stats_options: options.StatsOptions = options.StatsOptions(),
    pipeline_options: Optional[PipelineOptions] = None,
) -> statistics_pb2.DatasetFeatureStatisticsList:
  """Compute data statistics from TFRecord files containing TFExamples.

  Runs a Beam pipeline to compute the data statistics and return the result
  data statistics proto.

  This is a convenience method for users with data in TFRecord format.
  Users with data in unsupported file/data formats, or users who wish
  to create their own Beam pipelines need to use the 'GenerateStatistics'
  PTransform API directly instead.

  Args:
    data_location: The location of the input data files.
    output_path: The file path to output data statistics result to. If None, we
      use a temporary directory. It will be a TFRecord file containing a single
      data statistics proto, and can be read with the 'load_statistics' API.
      If you run this function on Google Cloud, you must specify an
      output_path. Specifying None may cause an error.
    stats_options: `tfdv.StatsOptions` for generating data statistics.
    pipeline_options: Optional beam pipeline options. This allows users to
      specify various beam pipeline execution parameters like pipeline runner
      (DirectRunner or DataflowRunner), cloud dataflow service project id, etc.
      See https://cloud.google.com/dataflow/pipelines/specifying-exec-params for
      more details.

  Returns:
    A DatasetFeatureStatisticsList proto.
  """
  if output_path is None:
    output_path = os.path.join(tempfile.mkdtemp(), 'data_stats.tfrecord')
  output_dir_path = os.path.dirname(output_path)
  if not tf.io.gfile.exists(output_dir_path):
    tf.io.gfile.makedirs(output_dir_path)

  batch_size = stats_options.desired_batch_size
  # PyLint doesn't understand Beam PTransforms.
  # pylint: disable=no-value-for-parameter
  with beam.Pipeline(options=pipeline_options) as p:
    # Auto detect tfrecord file compression format based on input data
    # path suffix.
    _ = (
        p
        | 'ReadData' >> (tf_example_record.TFExampleRecord(
            file_pattern=data_location,
            schema=None,
            telemetry_descriptors=['tfdv', 'generate_statistics_from_tfrecord'])
                         .BeamSource(batch_size))
        | 'GenerateStatistics' >> stats_api.GenerateStatistics(stats_options)
        | 'WriteStatsOutput' >>
        (stats_api.WriteStatisticsToTFRecord(output_path)))
  return stats_util.load_statistics(output_path)

get_confusion_count_dataframes

get_confusion_count_dataframes(
    confusion: Iterable[ConfusionCount],
) -> Dict[str, DataFrame]

Returns a pandas dataframe representation of a sequence of ConfusionCount.

PARAMETER DESCRIPTION
confusion

An interable over ConfusionCount protos.

TYPE: Iterable[ConfusionCount]

Returns: A map from feature name to a pandas dataframe containing match counts along with base and test counts for all unequal value pairs in the input.

Source code in tensorflow_data_validation/utils/display_util.py
def get_confusion_count_dataframes(
    confusion: Iterable[feature_skew_results_pb2.ConfusionCount],
) -> Dict[str, pd.DataFrame]:
  """Returns a pandas dataframe representation of a sequence of ConfusionCount.

  Args:
    confusion: An interable over ConfusionCount protos.
  Returns: A map from feature name to a pandas dataframe containing match counts
    along with base and test counts for all unequal value pairs in the input.
  """
  confusion = list(confusion)
  confusion_per_feature = collections.defaultdict(list)
  for c in confusion:
    confusion_per_feature[c.feature_name].append(c)

  def _build_df(confusion):
    base_count_per_value = collections.defaultdict(lambda: 0)
    test_count_per_value = collections.defaultdict(lambda: 0)
    value_counts = []
    for c in confusion:
      base_count_per_value[c.base.bytes_value] += c.count
      test_count_per_value[c.test.bytes_value] += c.count
      value_counts.append((c.base.bytes_value, c.test.bytes_value, c.count))
    df = pd.DataFrame(
        value_counts, columns=('Base value', 'Test value', 'Pair count')
    )
    df['Base count'] = df['Base value'].apply(lambda x: base_count_per_value[x])
    df['Test count'] = df['Test value'].apply(lambda x: test_count_per_value[x])
    df['Fraction of base'] = df['Pair count'] / df['Base count']
    df = (
        df[df['Base value'] != df['Test value']]
        .sort_values(['Base value', 'Fraction of base'])
        .reset_index(drop=True)
    )
    return df[
        ['Base value', 'Test value', 'Pair count', 'Base count', 'Test count']
    ]

  return {k: _build_df(v) for k, v in confusion_per_feature.items()}

get_domain

get_domain(
    schema: Schema,
    feature_path: Union[FeatureName, FeaturePath],
) -> Any

Get the domain associated with the input feature from the schema.

PARAMETER DESCRIPTION
schema

A Schema protocol buffer.

TYPE: Schema

feature_path

The path of the feature whose domain needs to be found. If a FeatureName is passed, a one-step FeaturePath will be constructed and used. For example, "my_feature" -> types.FeaturePath(["my_feature"])

TYPE: Union[FeatureName, FeaturePath]

RETURNS DESCRIPTION
Any

The domain protocol buffer associated with the input feature.

RAISES DESCRIPTION
TypeError

If the input schema is not of the expected type.

ValueError

If the input feature is not found in the schema or there is no domain associated with the feature.

Source code in tensorflow_data_validation/utils/schema_util.py
def get_domain(
    schema: schema_pb2.Schema, feature_path: Union[types.FeatureName,
                                                   types.FeaturePath]) -> Any:
  """Get the domain associated with the input feature from the schema.

  Args:
    schema: A Schema protocol buffer.
    feature_path: The path of the feature whose domain needs to be found. If a
      FeatureName is passed, a one-step FeaturePath will be constructed and
      used. For example, "my_feature" -> types.FeaturePath(["my_feature"])

  Returns:
    The domain protocol buffer associated with the input feature.

  Raises:
    TypeError: If the input schema is not of the expected type.
    ValueError: If the input feature is not found in the schema or there is
        no domain associated with the feature.
  """
  if not isinstance(schema, schema_pb2.Schema):
    raise TypeError('schema is of type %s, should be a Schema proto.' %
                    type(schema).__name__)

  feature = get_feature(schema, feature_path)
  domain_info = feature.WhichOneof('domain_info')

  if domain_info is None:
    raise ValueError('Feature %s has no domain associated with it.' %
                     feature_path)

  if domain_info != 'domain':
    return getattr(feature, domain_info)
  for domain in schema.string_domain:
    if domain.name == feature.domain:
      return domain

  raise ValueError('Feature %s has an unsupported domain %s.' %
                   (feature_path, domain_info))

get_feature

get_feature(
    schema: Schema,
    feature_path: Union[FeatureName, FeaturePath],
) -> Feature

Get a feature from the schema.

PARAMETER DESCRIPTION
schema

A Schema protocol buffer.

TYPE: Schema

feature_path

The path of the feature to obtain from the schema. If a FeatureName is passed, a one-step FeaturePath will be constructed and used. For example, "my_feature" -> types.FeaturePath(["my_feature"])

TYPE: Union[FeatureName, FeaturePath]

RETURNS DESCRIPTION
Feature

A Feature protocol buffer.

RAISES DESCRIPTION
TypeError

If the input schema is not of the expected type.

ValueError

If the input feature is not found in the schema.

Source code in tensorflow_data_validation/utils/schema_util.py
def get_feature(schema: schema_pb2.Schema,
                feature_path: Union[types.FeatureName, types.FeaturePath]
               ) -> schema_pb2.Feature:
  """Get a feature from the schema.

  Args:
    schema: A Schema protocol buffer.
    feature_path: The path of the feature to obtain from the schema. If a
      FeatureName is passed, a one-step FeaturePath will be constructed and
      used. For example, "my_feature" -> types.FeaturePath(["my_feature"])

  Returns:
    A Feature protocol buffer.

  Raises:
    TypeError: If the input schema is not of the expected type.
    ValueError: If the input feature is not found in the schema.
  """
  if not isinstance(schema, schema_pb2.Schema):
    raise TypeError('schema is of type %s, should be a Schema proto.' %
                    type(schema).__name__)

  if not isinstance(feature_path, types.FeaturePath):
    feature_path = types.FeaturePath([feature_path])

  feature_container = schema.feature
  parent = feature_path.parent()
  if parent:
    for step in parent.steps():
      f = look_up_feature(step, feature_container)
      if f is None:
        raise ValueError('Feature %s not found in the schema.' % feature_path)
      if f.type != schema_pb2.STRUCT:
        raise ValueError(
            'Step %s in feature %s does not refer to a valid STRUCT feature' %
            (step, feature_path))
      feature_container = f.struct_domain.feature

  feature = look_up_feature(feature_path.steps()[-1], feature_container)
  if feature is None:
    raise ValueError('Feature %s not found in the schema.' % feature_path)
  return feature

get_feature_stats

get_feature_stats(
    stats: DatasetFeatureStatistics,
    feature_path: FeaturePath,
) -> FeatureNameStatistics

Get feature statistics from the dataset statistics.

PARAMETER DESCRIPTION
stats

A DatasetFeatureStatistics protocol buffer.

TYPE: DatasetFeatureStatistics

feature_path

The path of the feature whose statistics to obtain from the dataset statistics.

TYPE: FeaturePath

RETURNS DESCRIPTION
FeatureNameStatistics

A FeatureNameStatistics protocol buffer.

RAISES DESCRIPTION
TypeError

If the input statistics is not of the expected type.

ValueError

If the input feature is not found in the dataset statistics.

Source code in tensorflow_data_validation/utils/stats_util.py
def get_feature_stats(stats: statistics_pb2.DatasetFeatureStatistics,
                      feature_path: types.FeaturePath
                     ) -> statistics_pb2.FeatureNameStatistics:
  """Get feature statistics from the dataset statistics.

  Args:
    stats: A DatasetFeatureStatistics protocol buffer.
    feature_path: The path of the feature whose statistics to obtain from the
      dataset statistics.

  Returns:
    A FeatureNameStatistics protocol buffer.

  Raises:
    TypeError: If the input statistics is not of the expected type.
    ValueError: If the input feature is not found in the dataset statistics.
  """
  if not isinstance(stats, statistics_pb2.DatasetFeatureStatistics):
    raise TypeError('statistics is of type %s, should be a '
                    'DatasetFeatureStatistics proto.' %
                    type(stats).__name__)

  for feature_stats in stats.features:
    if feature_path == types.FeaturePath.from_proto(feature_stats.path):
      return feature_stats

  raise ValueError('Feature %s not found in the dataset statistics.' %
                   feature_path)

get_match_stats_dataframe

get_match_stats_dataframe(
    match_stats: MatchStats,
) -> DataFrame

Formats MatchStats as a pandas dataframe.

Source code in tensorflow_data_validation/utils/display_util.py
def get_match_stats_dataframe(
    match_stats: feature_skew_results_pb2.MatchStats,
) -> pd.DataFrame:
  """Formats MatchStats as a pandas dataframe."""
  return pd.DataFrame.from_dict({
      'base_with_id_count': [match_stats.base_with_id_count],
      'test_with_id_count': [match_stats.test_with_id_count],
      'identifiers_count': [match_stats.identifiers_count],
      'ids_missing_in_base_count': [match_stats.ids_missing_in_base_count],
      'ids_missing_in_test_count': [match_stats.ids_missing_in_test_count],
      'matching_pairs_count': [match_stats.matching_pairs_count],
      'base_missing_id_count': [match_stats.base_missing_id_count],
      'test_missing_id_count': [match_stats.test_missing_id_count],
      'duplicate_id_count': [match_stats.duplicate_id_count],
  })

get_skew_result_dataframe

get_skew_result_dataframe(
    skew_results: Iterable[FeatureSkew],
) -> DataFrame

Formats FeatureSkew results as a pandas dataframe.

Source code in tensorflow_data_validation/utils/display_util.py
def get_skew_result_dataframe(
    skew_results: Iterable[feature_skew_results_pb2.FeatureSkew],
) -> pd.DataFrame:
  """Formats FeatureSkew results as a pandas dataframe."""
  result = []
  for feature_skew in skew_results:
    result.append((
        feature_skew.feature_name,
        feature_skew.base_count,
        feature_skew.test_count,
        feature_skew.match_count,
        feature_skew.base_only,
        feature_skew.test_only,
        feature_skew.mismatch_count,
        feature_skew.diff_count,
    ))
  # Preserve deterministic order from the proto.
  columns = [
      'feature_name',
      'base_count',
      'test_count',
      'match_count',
      'base_only',
      'test_only',
      'mismatch_count',
      'diff_count',
  ]
  return (
      pd.DataFrame(result, columns=columns)
      .sort_values('feature_name')
      .reset_index(drop=True)
  )

get_slice_stats

get_slice_stats(
    stats: DatasetFeatureStatisticsList, slice_key: Text
) -> DatasetFeatureStatisticsList

Get statistics associated with a specific slice.

PARAMETER DESCRIPTION
stats

A DatasetFeatureStatisticsList protocol buffer.

TYPE: DatasetFeatureStatisticsList

slice_key

Slice key of the slice.

TYPE: Text

RETURNS DESCRIPTION
DatasetFeatureStatisticsList

Statistics of the specific slice.

RAISES DESCRIPTION
ValueError

If the input statistics proto does not have the specified slice statistics.

Source code in tensorflow_data_validation/utils/stats_util.py
def get_slice_stats(
    stats: statistics_pb2.DatasetFeatureStatisticsList,
    slice_key: Text) -> statistics_pb2.DatasetFeatureStatisticsList:
  """Get statistics associated with a specific slice.

  Args:
    stats: A DatasetFeatureStatisticsList protocol buffer.
    slice_key: Slice key of the slice.

  Returns:
    Statistics of the specific slice.

  Raises:
    ValueError: If the input statistics proto does not have the specified slice
      statistics.
  """
  for slice_stats in stats.datasets:
    if slice_stats.name == slice_key:
      result = statistics_pb2.DatasetFeatureStatisticsList()
      result.datasets.add().CopyFrom(slice_stats)
      return result
  raise ValueError('Invalid slice key.')

get_statistics_html

get_statistics_html(
    lhs_statistics: DatasetFeatureStatisticsList,
    rhs_statistics: Optional[
        DatasetFeatureStatisticsList
    ] = None,
    lhs_name: Text = "lhs_statistics",
    rhs_name: Text = "rhs_statistics",
    allowlist_features: Optional[List[FeaturePath]] = None,
    denylist_features: Optional[List[FeaturePath]] = None,
) -> Text

Build the HTML for visualizing the input statistics using Facets.

PARAMETER DESCRIPTION
lhs_statistics

A DatasetFeatureStatisticsList protocol buffer.

TYPE: DatasetFeatureStatisticsList

rhs_statistics

An optional DatasetFeatureStatisticsList protocol buffer to compare with lhs_statistics.

TYPE: Optional[DatasetFeatureStatisticsList] DEFAULT: None

lhs_name

Name to use for the lhs_statistics dataset if a name is not already provided within the protocol buffer.

TYPE: Text DEFAULT: 'lhs_statistics'

rhs_name

Name to use for the rhs_statistics dataset if a name is not already provided within the protocol buffer.

TYPE: Text DEFAULT: 'rhs_statistics'

allowlist_features

Set of features to be visualized.

TYPE: Optional[List[FeaturePath]] DEFAULT: None

denylist_features

Set of features to ignore for visualization.

TYPE: Optional[List[FeaturePath]] DEFAULT: None

RETURNS DESCRIPTION
Text

HTML to be embedded for visualization.

RAISES DESCRIPTION
TypeError

If the input argument is not of the expected type.

ValueError

If the input statistics protos does not have only one dataset.

Source code in tensorflow_data_validation/utils/display_util.py
def get_statistics_html(
    lhs_statistics: statistics_pb2.DatasetFeatureStatisticsList,
    rhs_statistics: Optional[
        statistics_pb2.DatasetFeatureStatisticsList
    ] = None,
    lhs_name: Text = 'lhs_statistics',
    rhs_name: Text = 'rhs_statistics',
    allowlist_features: Optional[List[types.FeaturePath]] = None,
    denylist_features: Optional[List[types.FeaturePath]] = None,
) -> Text:
  """Build the HTML for visualizing the input statistics using Facets.

  Args:
    lhs_statistics: A DatasetFeatureStatisticsList protocol buffer.
    rhs_statistics: An optional DatasetFeatureStatisticsList protocol buffer to
      compare with lhs_statistics.
    lhs_name: Name to use for the lhs_statistics dataset if a name is not
      already provided within the protocol buffer.
    rhs_name: Name to use for the rhs_statistics dataset if a name is not
      already provided within the protocol buffer.
    allowlist_features: Set of features to be visualized.
    denylist_features: Set of features to ignore for visualization.

  Returns:
    HTML to be embedded for visualization.

  Raises:
    TypeError: If the input argument is not of the expected type.
    ValueError: If the input statistics protos does not have only one dataset.
  """
  combined_statistics = _get_combined_statistics(
      lhs_statistics,
      rhs_statistics,
      lhs_name,
      rhs_name,
      allowlist_features,
      denylist_features,
  )
  if (
      len(combined_statistics.datasets) == 1
      and combined_statistics.datasets[0].num_examples == 0
  ):
    return '<p>Empty dataset.</p>'

  protostr = base64.b64encode(combined_statistics.SerializeToString()).decode(
      'utf-8'
  )

  # pylint: disable=line-too-long,anomalous-backslash-in-string
  # Note that in the html template we currently assign a temporary id to the
  # facets element and then remove it once we have appended the serialized proto
  # string to the element. We do this to avoid any collision of ids when
  # displaying multiple facets output in the notebook.
  #
  # Note that a string literal including '</script>' in a <script> tag needs to
  # escape it as <\/script> to avoid early closing the wrapping <script> tag.
  html_template = """<iframe id='facets-iframe' width="100%" height="500px"></iframe>
        <script>
        facets_iframe = document.getElementById('facets-iframe');
        facets_html = '<script src="https://cdnjs.cloudflare.com/ajax/libs/webcomponentsjs/1.3.3/webcomponents-lite.js"><\/script><link rel="import" href="https://raw.githubusercontent.com/PAIR-code/facets/master/facets-dist/facets-jupyter.html"><facets-overview proto-input="protostr"></facets-overview>';
        facets_iframe.srcdoc = facets_html;
         facets_iframe.id = "";
         setTimeout(() => {
           facets_iframe.setAttribute('height', facets_iframe.contentWindow.document.body.offsetHeight + 'px')
         }, 1500)
         </script>"""
  # pylint: enable=line-too-long
  html = html_template.replace('protostr', protostr)

  return html

infer_schema

infer_schema(
    statistics: DatasetFeatureStatisticsList,
    infer_feature_shape: bool = True,
    max_string_domain_size: int = 100,
    schema_transformations: Optional[
        List[
            Callable[
                [Schema, DatasetFeatureStatistics], Schema
            ]
        ]
    ] = None,
) -> Schema

Infers schema from the input statistics.

PARAMETER DESCRIPTION
statistics

A DatasetFeatureStatisticsList protocol buffer. Schema inference is currently supported only for lists with a single DatasetFeatureStatistics proto or lists with multiple DatasetFeatureStatistics protos corresponding to data slices that include the default slice (i.e., the slice with all examples). If a list with multiple DatasetFeatureStatistics protos is used, this function will infer the schema from the statistics corresponding to the default slice.

TYPE: DatasetFeatureStatisticsList

infer_feature_shape

A boolean to indicate if shape of the features need to be inferred from the statistics.

TYPE: bool DEFAULT: True

max_string_domain_size

Maximum size of the domain of a string feature in order to be interpreted as a categorical feature.

TYPE: int DEFAULT: 100

schema_transformations

List of transformation functions to apply to the auto-inferred schema. Each transformation function should take the schema and statistics as input and should return the transformed schema. The transformations are applied in the order provided in the list.

TYPE: Optional[List[Callable[[Schema, DatasetFeatureStatistics], Schema]]] DEFAULT: None

RETURNS DESCRIPTION
Schema

A Schema protocol buffer.

RAISES DESCRIPTION
TypeError

If the input argument is not of the expected type.

ValueError

If the input statistics proto contains multiple datasets, none of which corresponds to the default slice.

Source code in tensorflow_data_validation/api/validation_api.py
def infer_schema(
    statistics: statistics_pb2.DatasetFeatureStatisticsList,
    infer_feature_shape: bool = True,
    max_string_domain_size: int = 100,
    schema_transformations: Optional[List[
        Callable[[schema_pb2.Schema, statistics_pb2.DatasetFeatureStatistics],
                 schema_pb2.Schema]]] = None
) -> schema_pb2.Schema:
  """Infers schema from the input statistics.

  Args:
    statistics: A DatasetFeatureStatisticsList protocol buffer. Schema inference
      is currently supported only for lists with a single
      DatasetFeatureStatistics proto or lists with multiple
      DatasetFeatureStatistics protos corresponding to data slices that include
      the default slice (i.e., the slice with all examples). If a list with
      multiple DatasetFeatureStatistics protos is used, this function will infer
      the schema from the statistics corresponding to the default slice.
    infer_feature_shape: A boolean to indicate if shape of the features need to
      be inferred from the statistics.
    max_string_domain_size: Maximum size of the domain of a string feature in
        order to be interpreted as a categorical feature.
    schema_transformations: List of transformation functions to apply to the
        auto-inferred schema. Each transformation function should take the
        schema and statistics as input and should return the transformed schema.
        The transformations are applied in the order provided in the list.

  Returns:
    A Schema protocol buffer.

  Raises:
    TypeError: If the input argument is not of the expected type.
    ValueError: If the input statistics proto contains multiple datasets, none
        of which corresponds to the default slice.
  """
  if not isinstance(statistics, statistics_pb2.DatasetFeatureStatisticsList):
    raise TypeError(
        'statistics is of type %s, should be '
        'a DatasetFeatureStatisticsList proto.' % type(statistics).__name__)

  # This will raise an exception if there are multiple datasets, none of which
  # corresponds to the default slice.
  dataset_statistics = _get_default_dataset_statistics(statistics)

  # dataset_statistics may include stats for composite features like
  # SparseFeatures and WeightedFeatures. We cannot infer a useful schema from
  # these stats, so we remove them at the start.
  dataset_statistics = _remove_features_missing_common_stats(dataset_statistics)

  schema_proto_string = pywrap_tensorflow_data_validation.InferSchema(
      tf.compat.as_bytes(dataset_statistics.SerializeToString()),
      max_string_domain_size, infer_feature_shape)

  # Parse the serialized Schema proto.
  result = schema_pb2.Schema()
  result.ParseFromString(schema_proto_string)

  _may_be_set_legacy_flag(result)

  if schema_transformations is not None:
    for transformation_fn in schema_transformations:
      result = transformation_fn(result, statistics.datasets[0])
  return result

load_anomalies_text

load_anomalies_text(input_path: Text) -> Anomalies

Loads the Anomalies proto stored in text format in the input path.

PARAMETER DESCRIPTION
input_path

File path from which to load the Anomalies proto.

TYPE: Text

RETURNS DESCRIPTION
Anomalies

An Anomalies protocol buffer.

Source code in tensorflow_data_validation/utils/anomalies_util.py
def load_anomalies_text(input_path: Text) -> anomalies_pb2.Anomalies:
  """Loads the Anomalies proto stored in text format in the input path.

  Args:
    input_path: File path from which to load the Anomalies proto.

  Returns:
    An Anomalies protocol buffer.
  """
  anomalies = anomalies_pb2.Anomalies()
  anomalies_text = io_util.read_file_to_string(input_path)
  text_format.Parse(anomalies_text, anomalies)
  return anomalies

load_schema_text

load_schema_text(input_path: Text) -> Schema

Loads the schema stored in text format in the input path.

PARAMETER DESCRIPTION
input_path

File path to load the schema from.

TYPE: Text

RETURNS DESCRIPTION
Schema

A Schema protocol buffer.

Source code in tensorflow_data_validation/utils/schema_util.py
def load_schema_text(input_path: Text) -> schema_pb2.Schema:
  """Loads the schema stored in text format in the input path.

  Args:
    input_path: File path to load the schema from.

  Returns:
    A Schema protocol buffer.
  """
  schema = schema_pb2.Schema()
  schema_text = io_util.read_file_to_string(input_path)
  text_format.Parse(schema_text, schema)
  return schema

load_sharded_statistics

load_sharded_statistics(
    input_path_prefix: Optional[str] = None,
    input_paths: Optional[Iterable[str]] = None,
    io_provider: Optional[StatisticsIOProvider] = None,
) -> DatasetListView

Read a sharded DatasetFeatureStatisticsList from disk as a DatasetListView.

PARAMETER DESCRIPTION
input_path_prefix

If passed, loads files starting with this prefix and ending with a pattern corresponding to the output of the provided io_provider.

TYPE: Optional[str] DEFAULT: None

input_paths

A list of file paths of files containing sharded DatasetFeatureStatisticsList protos.

TYPE: Optional[Iterable[str]] DEFAULT: None

io_provider

Optional StatisticsIOProvider. If unset, a default will be constructed.

TYPE: Optional[StatisticsIOProvider] DEFAULT: None

RETURNS DESCRIPTION
DatasetListView

A DatasetListView containing the merged proto.

Source code in tensorflow_data_validation/utils/stats_util.py
def load_sharded_statistics(
    input_path_prefix: Optional[str] = None,
    input_paths: Optional[Iterable[str]] = None,
    io_provider: Optional[artifacts_io_impl.StatisticsIOProvider] = None
) -> DatasetListView:
  """Read a sharded DatasetFeatureStatisticsList from disk as a DatasetListView.

  Args:
    input_path_prefix: If passed, loads files starting with this prefix and
      ending with a pattern corresponding to the output of the provided
        io_provider.
    input_paths: A list of file paths of files containing sharded
      DatasetFeatureStatisticsList protos.
    io_provider: Optional StatisticsIOProvider. If unset, a default will be
      constructed.

  Returns:
    A DatasetListView containing the merged proto.
  """
  if input_path_prefix is None == input_paths is None:
    raise ValueError('Must provide one of input_paths_prefix, input_paths.')
  if io_provider is None:
    io_provider = artifacts_io_impl.get_io_provider()
  if input_path_prefix is not None:
    input_paths = io_provider.glob(input_path_prefix)
  if not input_paths:
    raise ValueError('No input paths found paths=%s, pattern=%s' %
                     (input_paths, input_path_prefix))
  acc = statistics.DatasetListAccumulator()
  stats_iter = io_provider.record_iterator_impl(input_paths)
  for stats_list in stats_iter:
    for dataset in stats_list.datasets:
      acc.MergeDatasetFeatureStatistics(dataset.SerializeToString())
  stats = statistics_pb2.DatasetFeatureStatisticsList()
  stats.ParseFromString(acc.Get())
  return DatasetListView(stats)

load_statistics

load_statistics(
    input_path: Text,
) -> DatasetFeatureStatisticsList

Loads data statistics proto from file.

PARAMETER DESCRIPTION
input_path

Data statistics file path. The file should be a one-record TFRecord file or a plain file containing the statistics proto in Proto Text Format.

TYPE: Text

RETURNS DESCRIPTION
DatasetFeatureStatisticsList

A DatasetFeatureStatisticsList proto.

RAISES DESCRIPTION
IOError

If the input path does not exist.

Source code in tensorflow_data_validation/utils/stats_util.py
def load_statistics(
    input_path: Text) -> statistics_pb2.DatasetFeatureStatisticsList:
  """Loads data statistics proto from file.

  Args:
    input_path: Data statistics file path. The file should be a one-record
      TFRecord file or a plain file containing the statistics proto in Proto
      Text Format.

  Returns:
    A DatasetFeatureStatisticsList proto.

  Raises:
    IOError: If the input path does not exist.
  """
  if not tf.io.gfile.exists(input_path):
    raise IOError('Invalid input path {}.'.format(input_path))
  try:
    return load_stats_tfrecord(input_path)
  except Exception:  # pylint: disable=broad-except
    logging.info('File %s did not look like a TFRecord. Try reading as a plain '
                 'file.', input_path)
    return load_stats_text(input_path)

load_stats_binary

load_stats_binary(
    input_path: Text,
) -> DatasetFeatureStatisticsList

Loads a serialized DatasetFeatureStatisticsList proto from a file.

PARAMETER DESCRIPTION
input_path

File path from which to load the DatasetFeatureStatisticsList proto.

TYPE: Text

RETURNS DESCRIPTION
DatasetFeatureStatisticsList

A DatasetFeatureStatisticsList proto.

Source code in tensorflow_data_validation/utils/stats_util.py
def load_stats_binary(
    input_path: Text) -> statistics_pb2.DatasetFeatureStatisticsList:
  """Loads a serialized DatasetFeatureStatisticsList proto from a file.

  Args:
    input_path: File path from which to load the DatasetFeatureStatisticsList
      proto.

  Returns:
    A DatasetFeatureStatisticsList proto.
  """
  stats_proto = statistics_pb2.DatasetFeatureStatisticsList()
  stats_proto.ParseFromString(io_util.read_file_to_string(
      input_path, binary_mode=True))
  return stats_proto

load_stats_text

load_stats_text(
    input_path: Text,
) -> DatasetFeatureStatisticsList

Loads the specified DatasetFeatureStatisticsList proto stored in text format.

PARAMETER DESCRIPTION
input_path

File path from which to load the DatasetFeatureStatisticsList proto.

TYPE: Text

RETURNS DESCRIPTION
DatasetFeatureStatisticsList

A DatasetFeatureStatisticsList proto.

Source code in tensorflow_data_validation/utils/stats_util.py
def load_stats_text(
    input_path: Text) -> statistics_pb2.DatasetFeatureStatisticsList:
  """Loads the specified DatasetFeatureStatisticsList proto stored in text format.

  Args:
    input_path: File path from which to load the DatasetFeatureStatisticsList
      proto.

  Returns:
    A DatasetFeatureStatisticsList proto.
  """
  stats_proto = statistics_pb2.DatasetFeatureStatisticsList()
  stats_text = io_util.read_file_to_string(input_path)
  text_format.Parse(stats_text, stats_proto)
  return stats_proto

set_domain

set_domain(
    schema: Schema, feature_path: FeaturePath, domain: Any
) -> None

Sets the domain for the input feature in the schema.

If the input feature already has a domain, it is overwritten with the newly provided input domain. This method cannot be used to add a new global domain.

PARAMETER DESCRIPTION
schema

A Schema protocol buffer.

TYPE: Schema

feature_path

The name of the feature whose domain needs to be set. If a FeatureName is passed, a one-step FeaturePath will be constructed and used. For example, "my_feature" -> types.FeaturePath(["my_feature"])

TYPE: FeaturePath

domain

A domain protocol buffer or the name of a global string domain present in the input schema.

TYPE: Any

Example: ```python >>> from tensorflow_metadata.proto.v0 import schema_pb2

import tensorflow_data_validation as tfdv >>> schema = schema_pb2.Schema() >>> schema.feature.add(name='feature') # Setting a int domain. >>> int_domain = schema_pb2.IntDomain(min=3, max=5) >>> tfdv.set_domain(schema, "feature", int_domain) # Setting a string domain. str_domain = schema_pb2.StringDomain(value=['one', 'two', 'three']) >>> tfdv.set_domain(schema, "feature", str_domain) ```

RAISES DESCRIPTION
TypeError

If the input schema or the domain is not of the expected type.

ValueError

If an invalid global string domain is provided as input.

Source code in tensorflow_data_validation/utils/schema_util.py
def set_domain(schema: schema_pb2.Schema, feature_path: types.FeaturePath,
               domain: Any) -> None:
  """Sets the domain for the input feature in the schema.

  If the input feature already has a domain, it is overwritten with the newly
  provided input domain. This method cannot be used to add a new global domain.

  Args:
    schema: A Schema protocol buffer.
    feature_path: The name of the feature whose domain needs to be set. If a
      FeatureName is passed, a one-step FeaturePath will be constructed and
      used. For example, "my_feature" -> types.FeaturePath(["my_feature"])
    domain: A domain protocol buffer or the name of a global string domain
      present in the input schema.
  Example:  ```python >>> from tensorflow_metadata.proto.v0 import schema_pb2
    >>> import tensorflow_data_validation as tfdv >>> schema =
    schema_pb2.Schema() >>> schema.feature.add(name='feature') # Setting a int
    domain. >>> int_domain = schema_pb2.IntDomain(min=3, max=5) >>>
    tfdv.set_domain(schema, "feature", int_domain) # Setting a string domain.
    >>> str_domain = schema_pb2.StringDomain(value=['one', 'two', 'three']) >>>
    tfdv.set_domain(schema, "feature", str_domain) ```

  Raises:
    TypeError: If the input schema or the domain is not of the expected type.
    ValueError: If an invalid global string domain is provided as input.
  """
  if not isinstance(schema, schema_pb2.Schema):
    raise TypeError('schema is of type %s, should be a Schema proto.' %
                    type(schema).__name__)

  # Find all fields types and names within domain_info.
  feature_domains = {}
  for f in schema_pb2.Feature.DESCRIPTOR.oneofs_by_name['domain_info'].fields:
    if f.message_type is not None:
      feature_domains[getattr(schema_pb2, f.message_type.name)] = f.name
    elif f.type == descriptor.FieldDescriptor.TYPE_STRING:
      feature_domains[str] = f.name
    else:
      raise TypeError('Unexpected type within schema.Features.domain_info')
  if not isinstance(domain, tuple(feature_domains.keys())):
    raise TypeError('domain is of type %s, should be one of the supported types'
                    ' in schema.Features.domain_info' % type(domain).__name__)

  feature = get_feature(schema, feature_path)
  if feature.type == schema_pb2.STRUCT:
    raise TypeError('Could not set the domain of a STRUCT feature %s.' %
                    feature_path)

  if feature.WhichOneof('domain_info') is not None:
    logging.warning('Replacing existing domain of feature "%s".', feature_path)

  for d_type, d_name in feature_domains.items():
    if isinstance(domain, d_type):
      if d_type == str:
        found_domain = False
        for global_domain in schema.string_domain:
          if global_domain.name == domain:
            found_domain = True
            break
        if not found_domain:
          raise ValueError('Invalid global string domain "{}".'.format(domain))
        feature.domain = domain
      else:
        getattr(feature, d_name).CopyFrom(domain)

update_schema

update_schema(
    schema: Schema,
    statistics: DatasetFeatureStatisticsList,
    infer_feature_shape: Optional[bool] = True,
    max_string_domain_size: Optional[int] = 100,
) -> Schema

Updates input schema to conform to the input statistics.

PARAMETER DESCRIPTION
schema

A Schema protocol buffer.

TYPE: Schema

statistics

A DatasetFeatureStatisticsList protocol buffer. Schema inference is currently supported only for lists with a single DatasetFeatureStatistics proto or lists with multiple DatasetFeatureStatistics protos corresponding to data slices that include the default slice (i.e., the slice with all examples). If a list with multiple DatasetFeatureStatistics protos is used, this function will update the schema to conform to the statistics corresponding to the default slice.

TYPE: DatasetFeatureStatisticsList

infer_feature_shape

DEPRECATED, do not use. If a feature specifies a shape, the shape will always be validated. If the feature does not specify a shape, this function will not try inferring a shape from the given statistics.

TYPE: Optional[bool] DEFAULT: True

max_string_domain_size

Maximum size of the domain of a string feature in order to be interpreted as a categorical feature.

TYPE: Optional[int] DEFAULT: 100

RETURNS DESCRIPTION
Schema

A Schema protocol buffer.

RAISES DESCRIPTION
TypeError

If the input argument is not of the expected type.

ValueError

If the input statistics proto contains multiple datasets, none of which corresponds to the default slice.

Source code in tensorflow_data_validation/api/validation_api.py
def update_schema(schema: schema_pb2.Schema,
                  statistics: statistics_pb2.DatasetFeatureStatisticsList,
                  infer_feature_shape: Optional[bool] = True,
                  max_string_domain_size: Optional[int] = 100
                 ) -> schema_pb2.Schema:
  """Updates input schema to conform to the input statistics.

  Args:
    schema: A Schema protocol buffer.
    statistics: A DatasetFeatureStatisticsList protocol buffer. Schema inference
      is currently supported only for lists with a single
      DatasetFeatureStatistics proto or lists with multiple
      DatasetFeatureStatistics protos corresponding to data slices that include
      the default slice (i.e., the slice with all examples). If a list with
      multiple DatasetFeatureStatistics protos is used, this function will
      update the schema to conform to the statistics corresponding to the
      default slice.
    infer_feature_shape: DEPRECATED, do not use. If a feature specifies
      a shape, the shape will always be validated. If the feature does not
      specify a shape, this function will not try inferring a shape from the
      given statistics.
    max_string_domain_size: Maximum size of the domain of a string feature in
      order to be interpreted as a categorical feature.

  Returns:
    A Schema protocol buffer.

  Raises:
    TypeError: If the input argument is not of the expected type.
    ValueError: If the input statistics proto contains multiple datasets, none
        of which corresponds to the default slice.
  """
  del infer_feature_shape

  if not isinstance(schema, schema_pb2.Schema):
    raise TypeError('schema is of type %s, should be a Schema proto.' %
                    type(schema).__name__)
  if not isinstance(statistics, statistics_pb2.DatasetFeatureStatisticsList):
    raise TypeError(
        'statistics is of type %s, should be '
        'a DatasetFeatureStatisticsList proto.' % type(statistics).__name__)

  # This will raise an exception if there are multiple datasets, none of which
  # corresponds to the default slice.
  dataset_statistics = _get_default_dataset_statistics(statistics)

  schema_proto_string = pywrap_tensorflow_data_validation.UpdateSchema(
      tf.compat.as_bytes(schema.SerializeToString()),
      tf.compat.as_bytes(dataset_statistics.SerializeToString()),
      max_string_domain_size)

  # Parse the serialized Schema proto.
  result = schema_pb2.Schema()
  result.ParseFromString(schema_proto_string)

  return result

validate_corresponding_slices

validate_corresponding_slices(
    statistics: DatasetFeatureStatisticsList,
    schema: Schema,
    environment: Optional[Text] = None,
    previous_statistics: Optional[
        DatasetFeatureStatisticsList
    ] = None,
    serving_statistics: Optional[
        DatasetFeatureStatisticsList
    ] = None,
) -> Anomalies

Validates corresponding sliced statistics.

Sliced statistics are flattened into a single unsliced stats input prior to validation. If multiple statistics are provided, validation is performed on corresponding slices. DatasetConstraints, if present, are applied to the overall slice.

Note: This API is experimental and subject to change.

PARAMETER DESCRIPTION
statistics

See validate_statistics.

TYPE: DatasetFeatureStatisticsList

schema

See validate_statistics.

TYPE: Schema

environment

See validate_statistics.

TYPE: Optional[Text] DEFAULT: None

previous_statistics

See validate_statistics.

TYPE: Optional[DatasetFeatureStatisticsList] DEFAULT: None

serving_statistics

See validate_statistics.

TYPE: Optional[DatasetFeatureStatisticsList] DEFAULT: None

RETURNS DESCRIPTION
Anomalies

An Anomalies protocol buffer.

RAISES DESCRIPTION
TypeError

If any of the input arguments is not of the expected type.

Source code in tensorflow_data_validation/api/validation_api.py
def validate_corresponding_slices(
    statistics: statistics_pb2.DatasetFeatureStatisticsList,
    schema: schema_pb2.Schema,
    environment: Optional[Text] = None,
    previous_statistics: Optional[
        statistics_pb2.DatasetFeatureStatisticsList] = None,
    serving_statistics: Optional[
        statistics_pb2.DatasetFeatureStatisticsList] = None,
) -> anomalies_pb2.Anomalies:
  """Validates corresponding sliced statistics.

  Sliced statistics are flattened into a single unsliced stats input prior to
  validation. If multiple statistics are provided, validation is performed on
  corresponding slices. DatasetConstraints, if present, are applied to the
  overall slice.

  Note: This API is experimental and subject to change.

  Args:
    statistics: See validate_statistics.
    schema: See validate_statistics.
    environment: See validate_statistics.
    previous_statistics: See validate_statistics.
    serving_statistics: See validate_statistics.

  Returns:
    An Anomalies protocol buffer.

  Raises:
    TypeError: If any of the input arguments is not of the expected type.
  """
  all_slice_keys = set()
  statistics, keys = _flatten_statistics_for_sliced_validation(statistics)
  all_slice_keys.update(keys)
  if previous_statistics:
    previous_statistics, keys = _flatten_statistics_for_sliced_validation(
        previous_statistics)
    all_slice_keys.update(keys)
  if serving_statistics:
    serving_statistics, keys = _flatten_statistics_for_sliced_validation(
        serving_statistics)
    all_slice_keys.update(keys)
  schema = _replicate_schema_for_sliced_validation(schema, all_slice_keys)
  return validate_statistics(statistics, schema, environment,
                             previous_statistics, serving_statistics)

validate_examples_in_csv

validate_examples_in_csv(
    data_location: Text,
    stats_options: StatsOptions,
    column_names: Optional[List[FeatureName]] = None,
    delimiter: Text = ",",
    output_path: Optional[Text] = None,
    pipeline_options: Optional[PipelineOptions] = None,
    num_sampled_examples=0,
) -> Union[
    DatasetFeatureStatisticsList,
    Tuple[
        DatasetFeatureStatisticsList,
        Mapping[str, DataFrame],
    ],
]

Validates examples in csv files.

Runs a Beam pipeline to detect anomalies on a per-example basis. If this function detects anomalous examples, it generates summary statistics regarding the set of examples that exhibit each anomaly.

This is a convenience function for users with data in CSV format. Users with data in unsupported file/data formats, or users who wish to create their own Beam pipelines need to use the 'IdentifyAnomalousExamples' PTransform API directly instead.

PARAMETER DESCRIPTION
data_location

The location of the input data files.

TYPE: Text

stats_options

tfdv.StatsOptions for generating data statistics. This must contain a schema.

TYPE: StatsOptions

column_names

A list of column names to be treated as the CSV header. Order must match the order in the input CSV files. If this argument is not specified, we assume the first line in the input CSV files as the header. Note that this option is valid only for 'csv' input file format.

TYPE: Optional[List[FeatureName]] DEFAULT: None

delimiter

A one-character string used to separate fields in a CSV file.

TYPE: Text DEFAULT: ','

output_path

The file path to output data statistics result to. If None, the function uses a temporary directory. The output will be a TFRecord file containing a single data statistics list proto, and can be read with the 'load_statistics' function. If you run this function on Google Cloud, you must specify an output_path. Specifying None may cause an error.

TYPE: Optional[Text] DEFAULT: None

pipeline_options

Optional beam pipeline options. This allows users to specify various beam pipeline execution parameters like pipeline runner (DirectRunner or DataflowRunner), cloud dataflow service project id, etc. See https://cloud.google.com/dataflow/pipelines/specifying-exec-params for more details.

TYPE: Optional[PipelineOptions] DEFAULT: None

num_sampled_examples

If set, returns up to this many examples of each anomaly type as a map from anomaly reason string to pd.DataFrame.

DEFAULT: 0

RETURNS DESCRIPTION
Union[DatasetFeatureStatisticsList, Tuple[DatasetFeatureStatisticsList, Mapping[str, DataFrame]]]

If num_sampled_examples is zero, returns a single

Union[DatasetFeatureStatisticsList, Tuple[DatasetFeatureStatisticsList, Mapping[str, DataFrame]]]

DatasetFeatureStatisticsList proto in which each dataset consists of the

Union[DatasetFeatureStatisticsList, Tuple[DatasetFeatureStatisticsList, Mapping[str, DataFrame]]]

set of examples that exhibit a particular anomaly. If

Union[DatasetFeatureStatisticsList, Tuple[DatasetFeatureStatisticsList, Mapping[str, DataFrame]]]

num_sampled_examples is nonzero, returns the same statistics

Union[DatasetFeatureStatisticsList, Tuple[DatasetFeatureStatisticsList, Mapping[str, DataFrame]]]

proto as well as a mapping from anomaly to a pd.DataFrame of CSV rows

Union[DatasetFeatureStatisticsList, Tuple[DatasetFeatureStatisticsList, Mapping[str, DataFrame]]]

exhibiting that anomaly.

RAISES DESCRIPTION
ValueError

If the specified stats_options does not include a schema.

Source code in tensorflow_data_validation/utils/validation_lib.py
def validate_examples_in_csv(
    data_location: Text,
    stats_options: options.StatsOptions,
    column_names: Optional[List[types.FeatureName]] = None,
    delimiter: Text = ',',
    output_path: Optional[Text] = None,
    pipeline_options: Optional[PipelineOptions] = None,
    num_sampled_examples=0,
) -> Union[statistics_pb2.DatasetFeatureStatisticsList, Tuple[
    statistics_pb2.DatasetFeatureStatisticsList, Mapping[str, pd.DataFrame]]]:
  """Validates examples in csv files.

  Runs a Beam pipeline to detect anomalies on a per-example basis. If this
  function detects anomalous examples, it generates summary statistics regarding
  the set of examples that exhibit each anomaly.

  This is a convenience function for users with data in CSV format.
  Users with data in unsupported file/data formats, or users who wish
  to create their own Beam pipelines need to use the 'IdentifyAnomalousExamples'
  PTransform API directly instead.

  Args:
    data_location: The location of the input data files.
    stats_options: `tfdv.StatsOptions` for generating data statistics. This must
      contain a schema.
    column_names: A list of column names to be treated as the CSV header. Order
      must match the order in the input CSV files. If this argument is not
      specified, we assume the first line in the input CSV files as the header.
      Note that this option is valid only for 'csv' input file format.
    delimiter: A one-character string used to separate fields in a CSV file.
    output_path: The file path to output data statistics result to. If None, the
      function uses a temporary directory. The output will be a TFRecord file
      containing a single data statistics list proto, and can be read with the
      'load_statistics' function. If you run this function on Google Cloud, you
      must specify an output_path. Specifying None may cause an error.
    pipeline_options: Optional beam pipeline options. This allows users to
      specify various beam pipeline execution parameters like pipeline runner
      (DirectRunner or DataflowRunner), cloud dataflow service project id, etc.
      See https://cloud.google.com/dataflow/pipelines/specifying-exec-params for
        more details.
    num_sampled_examples: If set, returns up to this many examples of each
      anomaly type as a map from anomaly reason string to pd.DataFrame.

  Returns:
    If num_sampled_examples is zero, returns a single
    DatasetFeatureStatisticsList proto in which each dataset consists of the
    set of examples that exhibit a particular anomaly. If
    num_sampled_examples is nonzero, returns the same statistics
    proto as well as a mapping from anomaly to a pd.DataFrame of CSV rows
    exhibiting that anomaly.

  Raises:
    ValueError: If the specified stats_options does not include a schema.
  """
  if stats_options.schema is None:
    raise ValueError('The specified stats_options must include a schema.')
  if output_path is None:
    output_path = os.path.join(tempfile.mkdtemp(), 'anomaly_stats.tfrecord')
  output_dir_path = os.path.dirname(output_path)
  if not tf.io.gfile.exists(output_dir_path):
    tf.io.gfile.makedirs(output_dir_path)
  if num_sampled_examples:
    sample_materializer = io_util.Materializer(output_dir_path)

  # If a header is not provided, assume the first line in a file
  # to be the header.
  skip_header_lines = 1 if column_names is None else 0
  if column_names is None:
    column_names = stats_gen_lib.get_csv_header(data_location, delimiter)

  with beam.Pipeline(options=pipeline_options) as p:

    anomalous_examples = (
        p
        | 'ReadData' >> beam.io.textio.ReadFromText(
            file_pattern=data_location, skip_header_lines=skip_header_lines)
        | 'DecodeData' >> csv_decoder.DecodeCSV(
            column_names=column_names,
            delimiter=delimiter,
            schema=stats_options.schema
            if stats_options.infer_type_from_schema else None,
            desired_batch_size=1)
        | 'DetectAnomalies' >>
        validation_api.IdentifyAnomalousExamples(stats_options))
    _ = (
        anomalous_examples
        |
        'GenerateSummaryStatistics' >> stats_impl.GenerateSlicedStatisticsImpl(
            stats_options, is_slicing_enabled=True)
        |
        'WriteStatsOutput' >> stats_api.WriteStatisticsToTFRecord(output_path))
    if num_sampled_examples:
      _ = (
          anomalous_examples
          | 'Sample' >>
          beam.combiners.Sample.FixedSizePerKey(num_sampled_examples)
          | 'ToPandas' >> beam.FlatMap(_encode_pandas_and_key)
          | 'WriteSamples' >> sample_materializer.writer())

  if num_sampled_examples:
    samples_per_reason_acc = collections.defaultdict(list)
    for reason, pandas_dataframe in sample_materializer.reader():
      samples_per_reason_acc[reason].append(pandas_dataframe)
    samples_per_reason = {}
    for reason, dataframes in samples_per_reason_acc.items():
      samples_per_reason[reason] = pd.concat(dataframes)
    sample_materializer.cleanup()
    return stats_util.load_statistics(output_path), samples_per_reason
  return stats_util.load_statistics(output_path)

validate_examples_in_tfrecord

validate_examples_in_tfrecord(
    data_location: Text,
    stats_options: StatsOptions,
    output_path: Optional[Text] = None,
    pipeline_options: Optional[PipelineOptions] = None,
    num_sampled_examples=0,
) -> Union[
    DatasetFeatureStatisticsList,
    Tuple[
        DatasetFeatureStatisticsList,
        Mapping[str, List[Example]],
    ],
]

Validates TFExamples in TFRecord files.

Runs a Beam pipeline to detect anomalies on a per-example basis. If this function detects anomalous examples, it generates summary statistics regarding the set of examples that exhibit each anomaly.

This is a convenience function for users with data in TFRecord format. Users with data in unsupported file/data formats, or users who wish to create their own Beam pipelines need to use the 'IdentifyAnomalousExamples' PTransform API directly instead.

PARAMETER DESCRIPTION
data_location

The location of the input data files.

TYPE: Text

stats_options

tfdv.StatsOptions for generating data statistics. This must contain a schema.

TYPE: StatsOptions

output_path

The file path to output data statistics result to. If None, the function uses a temporary directory. The output will be a TFRecord file containing a single data statistics list proto, and can be read with the 'load_statistics' function. If you run this function on Google Cloud, you must specify an output_path. Specifying None may cause an error.

TYPE: Optional[Text] DEFAULT: None

pipeline_options

Optional beam pipeline options. This allows users to specify various beam pipeline execution parameters like pipeline runner (DirectRunner or DataflowRunner), cloud dataflow service project id, etc. See https://cloud.google.com/dataflow/pipelines/specifying-exec-params for more details.

TYPE: Optional[PipelineOptions] DEFAULT: None

num_sampled_examples

If set, returns up to this many examples of each anomaly type as a map from anomaly reason string to a list of tf.Examples.

DEFAULT: 0

RETURNS DESCRIPTION
Union[DatasetFeatureStatisticsList, Tuple[DatasetFeatureStatisticsList, Mapping[str, List[Example]]]]

If num_sampled_examples is zero, returns a single

Union[DatasetFeatureStatisticsList, Tuple[DatasetFeatureStatisticsList, Mapping[str, List[Example]]]]

DatasetFeatureStatisticsList proto in which each dataset consists of the

Union[DatasetFeatureStatisticsList, Tuple[DatasetFeatureStatisticsList, Mapping[str, List[Example]]]]

set of examples that exhibit a particular anomaly. If

Union[DatasetFeatureStatisticsList, Tuple[DatasetFeatureStatisticsList, Mapping[str, List[Example]]]]

num_sampled_examples is nonzero, returns the same statistics

Union[DatasetFeatureStatisticsList, Tuple[DatasetFeatureStatisticsList, Mapping[str, List[Example]]]]

proto as well as a mapping from anomaly to a list of tf.Examples that

Union[DatasetFeatureStatisticsList, Tuple[DatasetFeatureStatisticsList, Mapping[str, List[Example]]]]

exhibited that anomaly.

RAISES DESCRIPTION
ValueError

If the specified stats_options does not include a schema.

Source code in tensorflow_data_validation/utils/validation_lib.py
def validate_examples_in_tfrecord(
    data_location: Text,
    stats_options: options.StatsOptions,
    output_path: Optional[Text] = None,
    pipeline_options: Optional[PipelineOptions] = None,
    num_sampled_examples=0,
) -> Union[statistics_pb2.DatasetFeatureStatisticsList, Tuple[
    statistics_pb2.DatasetFeatureStatisticsList, Mapping[
        str, List[tf.train.Example]]]]:
  """Validates TFExamples in TFRecord files.

  Runs a Beam pipeline to detect anomalies on a per-example basis. If this
  function detects anomalous examples, it generates summary statistics regarding
  the set of examples that exhibit each anomaly.

  This is a convenience function for users with data in TFRecord format.
  Users with data in unsupported file/data formats, or users who wish
  to create their own Beam pipelines need to use the 'IdentifyAnomalousExamples'
  PTransform API directly instead.

  Args:
    data_location: The location of the input data files.
    stats_options: `tfdv.StatsOptions` for generating data statistics. This must
      contain a schema.
    output_path: The file path to output data statistics result to. If None, the
      function uses a temporary directory. The output will be a TFRecord file
      containing a single data statistics list proto, and can be read with the
      'load_statistics' function.
      If you run this function on Google Cloud, you must specify an
      output_path. Specifying None may cause an error.
    pipeline_options: Optional beam pipeline options. This allows users to
      specify various beam pipeline execution parameters like pipeline runner
      (DirectRunner or DataflowRunner), cloud dataflow service project id, etc.
      See https://cloud.google.com/dataflow/pipelines/specifying-exec-params for
      more details.
    num_sampled_examples: If set, returns up to this many examples
      of each anomaly type as a map from anomaly reason string to a list of
      tf.Examples.

  Returns:
    If num_sampled_examples is zero, returns a single
    DatasetFeatureStatisticsList proto in which each dataset consists of the
    set of examples that exhibit a particular anomaly. If
    num_sampled_examples is nonzero, returns the same statistics
    proto as well as a mapping from anomaly to a list of tf.Examples that
    exhibited that anomaly.

  Raises:
    ValueError: If the specified stats_options does not include a schema.
  """
  if stats_options.schema is None:
    raise ValueError('The specified stats_options must include a schema.')
  if output_path is None:
    output_path = os.path.join(tempfile.mkdtemp(), 'anomaly_stats.tfrecord')
  output_dir_path = os.path.dirname(output_path)
  if not tf.io.gfile.exists(output_dir_path):
    tf.io.gfile.makedirs(output_dir_path)
  with io_util.Materializer(output_dir_path) as sample_materializer:
    with beam.Pipeline(options=pipeline_options) as p:
      anomalous_examples = (
          p
          | 'ReadData' >> (tf_example_record.TFExampleRecord(
              file_pattern=data_location,
              schema=None,
              telemetry_descriptors=['tfdv', 'validate_examples_in_tfrecord'
                                    ]).BeamSource(batch_size=1))
          | 'DetectAnomalies' >>
          validation_api.IdentifyAnomalousExamples(stats_options))
      _ = (
          anomalous_examples | 'GenerateSummaryStatistics' >>
          stats_impl.GenerateSlicedStatisticsImpl(
              stats_options, is_slicing_enabled=True)
          | 'WriteStatsOutput' >>
          stats_api.WriteStatisticsToTFRecord(output_path))
      if num_sampled_examples:
        # TODO(b/68154497): Relint
        # pylint: disable=no-value-for-parameter
        _ = (
            anomalous_examples
            | 'Sample' >>
            beam.combiners.Sample.FixedSizePerKey(num_sampled_examples)
            | 'ToExample' >> _record_batch_to_example_fn(
                example_coder.RecordBatchToExamplesEncoder(
                    stats_options.schema))
            | 'WriteSamples' >> sample_materializer.writer())
        # pylint: enable=no-value-for-parameter
    if num_sampled_examples:
      samples_per_reason = collections.defaultdict(list)
      for reason, serialized_example in sample_materializer.reader():
        samples_per_reason[reason].append(
            tf.train.Example.FromString(serialized_example))
      return stats_util.load_statistics(output_path), samples_per_reason
  return stats_util.load_statistics(output_path)

validate_statistics

validate_statistics(
    statistics: DatasetFeatureStatisticsList,
    schema: Schema,
    environment: Optional[Text] = None,
    previous_statistics: Optional[
        DatasetFeatureStatisticsList
    ] = None,
    serving_statistics: Optional[
        DatasetFeatureStatisticsList
    ] = None,
    custom_validation_config: Optional[
        CustomValidationConfig
    ] = None,
) -> Anomalies

Validates the input statistics against the provided input schema.

This method validates the statistics against the schema. If an optional environment is specified, the schema is filtered using the environment and the statistics is validated against the filtered schema. The optional previous_statistics and serving_statistics are the statistics computed over the control data for drift- and skew-detection, respectively.

If drift- or skew-detection is conducted, then the raw skew/drift measurements for each feature that is compared will be recorded in the drift_skew_info field in the returned Anomalies proto.

PARAMETER DESCRIPTION
statistics

A DatasetFeatureStatisticsList protocol buffer denoting the statistics computed over the current data. Validation is currently supported only for lists with a single DatasetFeatureStatistics proto or lists with multiple DatasetFeatureStatistics protos corresponding to data slices that include the default slice (i.e., the slice with all examples). If a list with multiple DatasetFeatureStatistics protos is used, this function will validate the statistics corresponding to the default slice.

TYPE: DatasetFeatureStatisticsList

schema

A Schema protocol buffer. Note that TFDV does not currently support validation of the following messages/fields in the Schema protocol buffer: - FeaturePresenceWithinGroup - Schema-level FloatDomain and IntDomain (validation is supported for Feature-level FloatDomain and IntDomain)

TYPE: Schema

environment

An optional string denoting the validation environment. Must be one of the default environments specified in the schema. By default, validation assumes that all Examples in a pipeline adhere to a single schema. In some cases introducing slight schema variations is necessary, for instance features used as labels are required during training (and should be validated), but are missing during serving. Environments can be used to express such requirements. For example, assume a feature named 'LABEL' is required for training, but is expected to be missing from serving. This can be expressed by defining two distinct environments in schema: ["SERVING", "TRAINING"] and associating 'LABEL' only with environment "TRAINING".

TYPE: Optional[Text] DEFAULT: None

previous_statistics

An optional DatasetFeatureStatisticsList protocol buffer denoting the statistics computed over an earlier data (for example, previous day's data). If provided, the validate_statistics method will detect if there exists drift between current data and previous data. Configuration for drift detection can be done by specifying a drift_comparator in the schema.

TYPE: Optional[DatasetFeatureStatisticsList] DEFAULT: None

serving_statistics

An optional DatasetFeatureStatisticsList protocol buffer denoting the statistics computed over the serving data. If provided, the validate_statistics method will identify if there exists distribution skew between current data and serving data. Configuration for skew detection can be done by specifying a skew_comparator in the schema.

TYPE: Optional[DatasetFeatureStatisticsList] DEFAULT: None

custom_validation_config

An optional config that can be used to specify custom validations to perform. If doing single-feature validations, the test feature will come from statistics and will be mapped to feature in the SQL query. If doing feature pair validations, the test feature will come from statistics and will be mapped to feature_test in the SQL query, and the base feature will come from previous_statistics and will be mapped to feature_base in the SQL query.

TYPE: Optional[CustomValidationConfig] DEFAULT: None

RETURNS DESCRIPTION
Anomalies

An Anomalies protocol buffer.

RAISES DESCRIPTION
TypeError

If any of the input arguments is not of the expected type.

ValueError

If the input statistics proto contains multiple datasets, none of which corresponds to the default slice.

Source code in tensorflow_data_validation/api/validation_api.py
def validate_statistics(
    statistics: statistics_pb2.DatasetFeatureStatisticsList,
    schema: schema_pb2.Schema,
    environment: Optional[Text] = None,
    previous_statistics: Optional[
        statistics_pb2.DatasetFeatureStatisticsList] = None,
    serving_statistics: Optional[
        statistics_pb2.DatasetFeatureStatisticsList] = None,
    custom_validation_config: Optional[
        custom_validation_config_pb2.CustomValidationConfig] = None
) -> anomalies_pb2.Anomalies:
  """Validates the input statistics against the provided input schema.

  This method validates the `statistics` against the `schema`. If an optional
  `environment` is specified, the `schema` is filtered using the
  `environment` and the `statistics` is validated against the filtered schema.
  The optional `previous_statistics` and `serving_statistics` are the statistics
  computed over the control data for drift- and skew-detection, respectively.

  If drift- or skew-detection is conducted, then the raw skew/drift measurements
  for each feature that is compared will be recorded in the `drift_skew_info`
  field in the returned `Anomalies` proto.

  Args:
    statistics: A DatasetFeatureStatisticsList protocol buffer denoting the
       statistics computed over the current data. Validation is currently
       supported only for lists with a single DatasetFeatureStatistics proto or
       lists with multiple DatasetFeatureStatistics protos corresponding to data
       slices that include the default slice (i.e., the slice with all
       examples). If a list with multiple DatasetFeatureStatistics protos is
       used, this function will validate the statistics corresponding to the
       default slice.
    schema: A Schema protocol buffer.
       Note that TFDV does not currently support validation of the following
       messages/fields in the Schema protocol buffer:
       - FeaturePresenceWithinGroup
       - Schema-level FloatDomain and IntDomain (validation is supported for
         Feature-level FloatDomain and IntDomain)
    environment: An optional string denoting the validation environment.
        Must be one of the default environments specified in the schema.
        By default, validation assumes that all Examples in a pipeline adhere
        to a single schema. In some cases introducing slight schema variations
        is necessary, for instance features used as labels are required during
        training (and should be validated), but are missing during serving.
        Environments can be used to express such requirements. For example,
        assume a feature named 'LABEL' is required for training, but is expected
        to be missing from serving. This can be expressed by defining two
        distinct environments in schema: ["SERVING", "TRAINING"] and
        associating 'LABEL' only with environment "TRAINING".
    previous_statistics: An optional DatasetFeatureStatisticsList protocol
        buffer denoting the statistics computed over an earlier data (for
        example, previous day's data). If provided, the `validate_statistics`
        method will detect if there exists drift between current data and
        previous data. Configuration for drift detection can be done by
        specifying a `drift_comparator` in the schema.
    serving_statistics: An optional DatasetFeatureStatisticsList protocol
        buffer denoting the statistics computed over the serving data. If
        provided, the `validate_statistics` method will identify if there exists
        distribution skew between current data and serving data. Configuration
        for skew detection can be done by specifying a `skew_comparator` in the
        schema.
    custom_validation_config: An optional config that can be used to specify
        custom validations to perform. If doing single-feature validations,
        the test feature will come from `statistics` and will be mapped to
        `feature` in the SQL query. If doing feature pair validations, the test
        feature will come from `statistics` and will be mapped to `feature_test`
        in the SQL query, and the base feature will come from
        `previous_statistics` and will be mapped to `feature_base` in the SQL
        query.

  Returns:
    An Anomalies protocol buffer.

  Raises:
    TypeError: If any of the input arguments is not of the expected type.
    ValueError: If the input statistics proto contains multiple datasets, none
        of which corresponds to the default slice.
  """

  # This check is added here because the arguments name for previous_statistics
  # is different in TFX::OSS and TFX internal. It is preferred to report the
  # error with the name used in the API.
  if previous_statistics is not None:
    if not isinstance(
        previous_statistics, statistics_pb2.DatasetFeatureStatisticsList):
      raise TypeError(
          'previous_statistics is of type %s, should be '
          'a DatasetFeatureStatisticsList proto.'
          % type(previous_statistics).__name__)

  return validate_statistics_internal(statistics, schema, environment,
                                      previous_statistics, serving_statistics,
                                      None, None, False,
                                      custom_validation_config)

visualize_statistics

visualize_statistics(
    lhs_statistics: DatasetFeatureStatisticsList,
    rhs_statistics: Optional[
        DatasetFeatureStatisticsList
    ] = None,
    lhs_name: Text = "lhs_statistics",
    rhs_name: Text = "rhs_statistics",
    allowlist_features: Optional[List[FeaturePath]] = None,
    denylist_features: Optional[List[FeaturePath]] = None,
) -> None

Visualize the input statistics using Facets.

PARAMETER DESCRIPTION
lhs_statistics

A DatasetFeatureStatisticsList protocol buffer.

TYPE: DatasetFeatureStatisticsList

rhs_statistics

An optional DatasetFeatureStatisticsList protocol buffer to compare with lhs_statistics.

TYPE: Optional[DatasetFeatureStatisticsList] DEFAULT: None

lhs_name

Name to use for the lhs_statistics dataset if a name is not already provided within the protocol buffer.

TYPE: Text DEFAULT: 'lhs_statistics'

rhs_name

Name to use for the rhs_statistics dataset if a name is not already provided within the protocol buffer.

TYPE: Text DEFAULT: 'rhs_statistics'

allowlist_features

Set of features to be visualized.

TYPE: Optional[List[FeaturePath]] DEFAULT: None

denylist_features

Set of features to ignore for visualization.

TYPE: Optional[List[FeaturePath]] DEFAULT: None

RAISES DESCRIPTION
TypeError

If the input argument is not of the expected type.

ValueError

If the input statistics protos does not have only one dataset.

Source code in tensorflow_data_validation/utils/display_util.py
def visualize_statistics(
    lhs_statistics: statistics_pb2.DatasetFeatureStatisticsList,
    rhs_statistics: Optional[
        statistics_pb2.DatasetFeatureStatisticsList
    ] = None,
    lhs_name: Text = 'lhs_statistics',
    rhs_name: Text = 'rhs_statistics',
    allowlist_features: Optional[List[types.FeaturePath]] = None,
    denylist_features: Optional[List[types.FeaturePath]] = None,
) -> None:
  """Visualize the input statistics using Facets.

  Args:
    lhs_statistics: A DatasetFeatureStatisticsList protocol buffer.
    rhs_statistics: An optional DatasetFeatureStatisticsList protocol buffer to
      compare with lhs_statistics.
    lhs_name: Name to use for the lhs_statistics dataset if a name is not
      already provided within the protocol buffer.
    rhs_name: Name to use for the rhs_statistics dataset if a name is not
      already provided within the protocol buffer.
    allowlist_features: Set of features to be visualized.
    denylist_features: Set of features to ignore for visualization.

  Raises:
    TypeError: If the input argument is not of the expected type.
    ValueError: If the input statistics protos does not have only one dataset.
  """
  assert (
      not allowlist_features or not denylist_features
  ), 'Only specify one of allowlist_features and denylist_features.'
  html = get_statistics_html(
      lhs_statistics,
      rhs_statistics,
      lhs_name,
      rhs_name,
      allowlist_features,
      denylist_features,
  )
  display(HTML(html))

write_anomalies_text

write_anomalies_text(
    anomalies: Anomalies, output_path: Text
) -> None

Writes the Anomalies proto to a file in text format.

PARAMETER DESCRIPTION
anomalies

An Anomalies protocol buffer.

TYPE: Anomalies

output_path

File path to which to write the Anomalies proto.

TYPE: Text

RAISES DESCRIPTION
TypeError

If the input Anomalies proto is not of the expected type.

Source code in tensorflow_data_validation/utils/anomalies_util.py
def write_anomalies_text(anomalies: anomalies_pb2.Anomalies,
                         output_path: Text) -> None:
  """Writes the Anomalies proto to a file in text format.

  Args:
    anomalies: An Anomalies protocol buffer.
    output_path: File path to which to write the Anomalies proto.

  Raises:
    TypeError: If the input Anomalies proto is not of the expected type.
  """
  if not isinstance(anomalies, anomalies_pb2.Anomalies):
    raise TypeError(
        'anomalies is of type %s; should be an Anomalies proto.' %
        type(anomalies).__name__)

  anomalies_text = text_format.MessageToString(anomalies)
  io_util.write_string_to_file(output_path, anomalies_text)

write_schema_text

write_schema_text(
    schema: Schema, output_path: Text
) -> None

Writes input schema to a file in text format.

PARAMETER DESCRIPTION
schema

A Schema protocol buffer.

TYPE: Schema

output_path

File path to write the input schema.

TYPE: Text

RAISES DESCRIPTION
TypeError

If the input schema is not of the expected type.

Source code in tensorflow_data_validation/utils/schema_util.py
def write_schema_text(schema: schema_pb2.Schema, output_path: Text) -> None:
  """Writes input schema to a file in text format.

  Args:
    schema: A Schema protocol buffer.
    output_path: File path to write the input schema.

  Raises:
    TypeError: If the input schema is not of the expected type.
  """
  if not isinstance(schema, schema_pb2.Schema):
    raise TypeError('schema is of type %s, should be a Schema proto.' %
                    type(schema).__name__)

  schema_text = text_format.MessageToString(schema)
  io_util.write_string_to_file(output_path, schema_text)

write_stats_text

write_stats_text(
    stats: DatasetFeatureStatisticsList, output_path: Text
) -> None

Writes a DatasetFeatureStatisticsList proto to a file in text format.

PARAMETER DESCRIPTION
stats

A DatasetFeatureStatisticsList proto.

TYPE: DatasetFeatureStatisticsList

output_path

File path to write the DatasetFeatureStatisticsList proto.

TYPE: Text

RAISES DESCRIPTION
TypeError

If the input proto is not of the expected type.

Source code in tensorflow_data_validation/utils/stats_util.py
def write_stats_text(stats: statistics_pb2.DatasetFeatureStatisticsList,
                     output_path: Text) -> None:
  """Writes a DatasetFeatureStatisticsList proto to a file in text format.

  Args:
    stats: A DatasetFeatureStatisticsList proto.
    output_path: File path to write the DatasetFeatureStatisticsList proto.

  Raises:
    TypeError: If the input proto is not of the expected type.
  """
  if not isinstance(stats, statistics_pb2.DatasetFeatureStatisticsList):
    raise TypeError(
        'stats is of type %s, should be a '
        'DatasetFeatureStatisticsList proto.' % type(stats).__name__)

  stats_proto_text = text_format.MessageToString(stats)
  io_util.write_string_to_file(output_path, stats_proto_text)