Skip to content

TensorFlow Transform tft.beam.analyzer_cache Module

tensorflow_transform.beam.analyzer_cache

Module which allows a pipeilne to define and utilize cached analyzers.

Attributes

BeamAnalysisCache module-attribute

BeamAnalysisCache = Mapping[DatasetKey, DatasetCache]

Classes

DatasetCache

Bases: TypedNamedTuple('DatasetCache', [('cache_dict', Mapping[str, PCollection[bytes]]), ('metadata', Optional[Union[PCollection[DatasetCacheMetadata], DatasetCacheMetadata]])])

Complete cache for a dataset as well as metadata.

Functions
get
get(key)
Source code in tensorflow_transform/beam/analyzer_cache.py
def get(self, key):
  return self.cache_dict.get(key)
items
items()
Source code in tensorflow_transform/beam/analyzer_cache.py
def items(self):
  return self.cache_dict.items()
keys
keys()
Source code in tensorflow_transform/beam/analyzer_cache.py
def keys(self):
  return self.cache_dict.keys()
values
values()
Source code in tensorflow_transform/beam/analyzer_cache.py
def values(self):
  return self.cache_dict.values()

DatasetCacheMetadata

Bases: TypedNamedTuple('DatasetCacheMetadata', [('dataset_size', int)])

Metadata about a cached dataset.

Functions
decode classmethod
decode(value: bytes) -> DatasetCacheMetadata
Source code in tensorflow_transform/beam/analyzer_cache.py
@classmethod
def decode(cls, value: bytes) -> 'DatasetCacheMetadata':
  return cls(**pickle.loads(value))
encode
encode() -> bytes
Source code in tensorflow_transform/beam/analyzer_cache.py
def encode(self) -> bytes:
  return pickle.dumps(self._asdict(), protocol=0)

DatasetKey

Bases: namedtuple('DatasetKey', ['key', 'is_cached'])

A key for a dataset used for analysis.

Functions
is_flattened_dataset_key
is_flattened_dataset_key() -> bool
Source code in tensorflow_transform/beam/analyzer_cache.py
def is_flattened_dataset_key(self) -> bool:
  return self.key == self._FLATTENED_DATASET_KEY
non_cacheable
non_cacheable() -> DatasetKey

Creates a non cacheable dataset key, for which no cache will be produced.

Source code in tensorflow_transform/beam/analyzer_cache.py
def non_cacheable(self) -> 'DatasetKey':
  """Creates a non cacheable dataset key, for which no cache will be produced."""
  return self._replace(key=f'uncached_{self.key}', is_cached=False)

ReadAnalysisCacheFromFS

ReadAnalysisCacheFromFS(
    cache_base_dir: str,
    dataset_keys: Iterable[DatasetKey],
    cache_entry_keys: Optional[Iterable[bytes]] = None,
    source: Optional[object] = None,
)

Bases: PTransform

Reads cache from the FS written by WriteAnalysisCacheToFS.

Init method.

PARAMETER DESCRIPTION
cache_base_dir

A string, the path that the cache should be stored in.

TYPE: str

dataset_keys

An iterable of DatasetKeys.

TYPE: Iterable[DatasetKey]

cache_entry_keys

(Optional) An iterable of cache entry key strings. If provided, only cache entries that exist in cache_entry_keys will be read.

TYPE: Optional[Iterable[bytes]] DEFAULT: None

source

(Optional) A PTransform class that takes a path argument in its constructor, and is used to read the cache.

TYPE: Optional[object] DEFAULT: None

Source code in tensorflow_transform/beam/analyzer_cache.py
def __init__(self,
             cache_base_dir: str,
             dataset_keys: Iterable[DatasetKey],
             cache_entry_keys: Optional[Iterable[bytes]] = None,
             source: Optional[object] = None):
  """Init method.

  Args:
    cache_base_dir: A string, the path that the cache should be stored in.
    dataset_keys: An iterable of `DatasetKey`s.
    cache_entry_keys: (Optional) An iterable of cache entry key strings. If
      provided, only cache entries that exist in `cache_entry_keys` will be
      read.
    source: (Optional) A PTransform class that takes a path argument in its
      constructor, and is used to read the cache.
  """
  self._cache_base_dir = cache_base_dir
  if not all(isinstance(d, DatasetKey) for d in dataset_keys):
    raise ValueError('Expected dataset_keys to be of type DatasetKey')
  self._sorted_dataset_keys = sorted(dataset_keys)
  self._filtered_cache_entry_keys = (None if cache_entry_keys is None else
                                     set(cache_entry_keys))
  # TODO(b/37788560): Possibly use Riegeli as a default file format once
  # possible.
  self._source = source if source is not None else beam.io.ReadFromTFRecord
Attributes
label property writable
label
pipeline class-attribute instance-attribute
pipeline = None
side_inputs class-attribute instance-attribute
side_inputs = ()
Functions
annotations
annotations() -> dict[str, Union[bytes, str, Message]]
Source code in apache_beam/transforms/ptransform.py
def annotations(self) -> dict[str, Union[bytes, str, message.Message]]:
  return {
      'python_type':  #
      f'{self.__class__.__module__}.{self.__class__.__qualname__}'
  }
default_label
default_label()
Source code in apache_beam/transforms/ptransform.py
def default_label(self):
  # type: () -> str
  return self.__class__.__name__
default_type_hints
default_type_hints()
Source code in apache_beam/transforms/ptransform.py
def default_type_hints(self):
  fn_type_hints = IOTypeHints.from_callable(self.expand)
  if fn_type_hints is not None:
    fn_type_hints = fn_type_hints.strip_pcoll()

  # Prefer class decorator type hints for backwards compatibility.
  return get_type_hints(self.__class__).with_defaults(fn_type_hints)
display_data
display_data()

Returns the display data associated to a pipeline component.

It should be reimplemented in pipeline components that wish to have static display data.

RETURNS DESCRIPTION

Dict[str, Any]: A dictionary containing key:value pairs.

The value might be an integer, float or string value; a

class:DisplayDataItem for values that have more data

(e.g. short value, label, url); or a :class:HasDisplayData instance

that has more display data that should be picked up. For example::

{ 'key1': 'string_value', 'key2': 1234, 'key3': 3.14159265, 'key4': DisplayDataItem('apache.org', url='http://apache.org'), 'key5': subComponent }

Source code in apache_beam/transforms/display.py
def display_data(self):
  # type: () -> dict

  """ Returns the display data associated to a pipeline component.

  It should be reimplemented in pipeline components that wish to have
  static display data.

  Returns:
    Dict[str, Any]: A dictionary containing ``key:value`` pairs.
    The value might be an integer, float or string value; a
    :class:`DisplayDataItem` for values that have more data
    (e.g. short value, label, url); or a :class:`HasDisplayData` instance
    that has more display data that should be picked up. For example::

      {
        'key1': 'string_value',
        'key2': 1234,
        'key3': 3.14159265,
        'key4': DisplayDataItem('apache.org', url='http://apache.org'),
        'key5': subComponent
      }
  """
  return {}
expand
expand(pipeline: Pipeline)
Source code in tensorflow_transform/beam/analyzer_cache.py
def expand(self, pipeline: beam.Pipeline):
  result = {}

  for dataset_key_idx, dataset_key in enumerate(self._sorted_dataset_keys):

    dataset_cache_path = _get_dataset_cache_path(self._cache_base_dir,
                                                 dataset_key)
    manifest_file = _ManifestFile(dataset_cache_path)
    manifest = manifest_file.read()
    if not manifest:
      continue
    dataset_id = f'AnalysisIndex{dataset_key_idx}'
    cache_dict = {}
    for key, cache_key_idx in manifest.items():
      if self._should_read_cache_entry_key(key):
        cache_dict[key] = (
            pipeline
            | f'Read[{dataset_id}]][CacheKeyIndex{cache_key_idx}]' >>
            self._source(
                f'{os.path.join(dataset_cache_path, str(cache_key_idx))}-*-of-*'
            ))
    metadata = pipeline | f'ReadMetadata[{dataset_id}]' >> _ReadMetadata(
        dataset_cache_path)
    result[dataset_key] = DatasetCache(cache_dict, metadata)
  return result
from_runner_api classmethod
from_runner_api(proto, context)
Source code in apache_beam/transforms/ptransform.py
@classmethod
def from_runner_api(cls,
                    proto,  # type: Optional[beam_runner_api_pb2.PTransform]
                    context  # type: PipelineContext
                   ):
  # type: (...) -> Optional[PTransform]
  if proto is None or proto.spec is None or not proto.spec.urn:
    return None
  parameter_type, constructor = cls._known_urns[proto.spec.urn]

  return constructor(
      proto,
      proto_utils.parse_Bytes(proto.spec.payload, parameter_type),
      context)
get_resource_hints
get_resource_hints()
Source code in apache_beam/transforms/ptransform.py
def get_resource_hints(self):
  # type: () -> dict[str, bytes]
  if '_resource_hints' not in self.__dict__:
    # PTransform subclasses don't always call super(), so prefer lazy
    # initialization. By default, transforms don't have any resource hints.
    self._resource_hints = {}  # type: dict[str, bytes]
  return self._resource_hints
get_type_hints
get_type_hints()

Gets and/or initializes type hints for this object.

If type hints have not been set, attempts to initialize type hints in this order: - Using self.default_type_hints(). - Using self.class type hints.

Source code in apache_beam/typehints/decorators.py
def get_type_hints(self):
  """Gets and/or initializes type hints for this object.

  If type hints have not been set, attempts to initialize type hints in this
  order:
  - Using self.default_type_hints().
  - Using self.__class__ type hints.
  """
  return (
      self._get_or_create_type_hints().with_defaults(
          self.default_type_hints()).with_defaults(
              get_type_hints(self.__class__)))
get_windowing
get_windowing(inputs)

Returns the window function to be associated with transform's output.

By default most transforms just return the windowing function associated with the input PCollection (or the first input if several).

Source code in apache_beam/transforms/ptransform.py
def get_windowing(self, inputs):
  # type: (Any) -> Windowing

  """Returns the window function to be associated with transform's output.

  By default most transforms just return the windowing function associated
  with the input PCollection (or the first input if several).
  """
  if inputs:
    return inputs[0].windowing
  else:
    from apache_beam.transforms.core import Windowing
    from apache_beam.transforms.window import GlobalWindows
    # TODO(robertwb): Return something compatible with every windowing?
    return Windowing(GlobalWindows())
infer_output_type
infer_output_type(unused_input_type)
Source code in apache_beam/transforms/ptransform.py
def infer_output_type(self, unused_input_type):
  return self.get_type_hints().simple_output_type(self.label) or typehints.Any
register_urn classmethod
register_urn(urn, parameter_type)
register_urn(urn, parameter_type)
register_urn(urn, parameter_type, constructor)
register_urn(urn, parameter_type, constructor)
register_urn(urn, parameter_type, constructor=None)
Source code in apache_beam/transforms/ptransform.py
@classmethod
def register_urn(cls, urn, parameter_type, constructor=None):
  def register(constructor):
    if isinstance(constructor, type):
      constructor.from_runner_api_parameter = register(
          constructor.from_runner_api_parameter)
    else:
      cls._known_urns[urn] = parameter_type, constructor
    return constructor

  if constructor:
    # Used as a statement.
    register(constructor)
  else:
    # Used as a decorator.
    return register
runner_api_requires_keyed_input
runner_api_requires_keyed_input()
Source code in apache_beam/transforms/ptransform.py
def runner_api_requires_keyed_input(self):
  return False
to_runner_api
to_runner_api(context, has_parts=False, **extra_kwargs)
Source code in apache_beam/transforms/ptransform.py
def to_runner_api(self, context, has_parts=False, **extra_kwargs):
  # type: (PipelineContext, bool, Any) -> beam_runner_api_pb2.FunctionSpec
  from apache_beam.portability.api import beam_runner_api_pb2
  # typing: only ParDo supports extra_kwargs
  urn, typed_param = self.to_runner_api_parameter(context, **extra_kwargs)
  if urn == python_urns.GENERIC_COMPOSITE_TRANSFORM and not has_parts:
    # TODO(https://github.com/apache/beam/issues/18713): Remove this fallback.
    urn, typed_param = self.to_runner_api_pickled(context)
  return beam_runner_api_pb2.FunctionSpec(
      urn=urn,
      payload=typed_param.SerializeToString() if isinstance(
          typed_param, message.Message) else typed_param.encode('utf-8')
      if isinstance(typed_param, str) else typed_param)
to_runner_api_parameter
to_runner_api_parameter(unused_context)
Source code in apache_beam/transforms/ptransform.py
def to_runner_api_parameter(
    self,
    unused_context  # type: PipelineContext
):
  # type: (...) -> tuple[str, Optional[Union[message.Message, bytes, str]]]
  # The payload here is just to ease debugging.
  return (
      python_urns.GENERIC_COMPOSITE_TRANSFORM,
      getattr(self, '_fn_api_payload', str(self)))
to_runner_api_pickled
to_runner_api_pickled(context)
Source code in apache_beam/transforms/ptransform.py
def to_runner_api_pickled(self, context):
  # type: (PipelineContext) -> tuple[str, bytes]
  return (
      python_urns.PICKLED_TRANSFORM,
      pickler.dumps(
          self,
          enable_best_effort_determinism=context.
          enable_best_effort_deterministic_pickling,
      ),
  )
type_check_inputs
type_check_inputs(pvalueish)
Source code in apache_beam/transforms/ptransform.py
def type_check_inputs(self, pvalueish):
  self.type_check_inputs_or_outputs(pvalueish, 'input')
type_check_inputs_or_outputs
type_check_inputs_or_outputs(pvalueish, input_or_output)
Source code in apache_beam/transforms/ptransform.py
def type_check_inputs_or_outputs(self, pvalueish, input_or_output):
  type_hints = self.get_type_hints()
  hints = getattr(type_hints, input_or_output + '_types')
  if hints is None or not any(hints):
    return
  arg_hints, kwarg_hints = hints
  if arg_hints and kwarg_hints:
    raise TypeCheckError(
        'PTransform cannot have both positional and keyword type hints '
        'without overriding %s._type_check_%s()' %
        (self.__class__, input_or_output))
  root_hint = (
      arg_hints[0] if len(arg_hints) == 1 else arg_hints or kwarg_hints)
  for context, pvalue_, hint in _ZipPValues().visit(pvalueish, root_hint):
    if isinstance(pvalue_, DoOutputsTuple):
      continue
    if pvalue_.element_type is None:
      # TODO(robertwb): It's a bug that we ever get here. (typecheck)
      continue
    if hint and not typehints.is_consistent_with(pvalue_.element_type, hint):
      at_context = ' %s %s' % (input_or_output, context) if context else ''
      raise TypeCheckError(
          '{type} type hint violation at {label}{context}: expected {hint}, '
          'got {actual_type}'.format(
              type=input_or_output.title(),
              label=self.label,
              context=at_context,
              hint=hint,
              actual_type=pvalue_.element_type))
type_check_outputs
type_check_outputs(pvalueish)
Source code in apache_beam/transforms/ptransform.py
def type_check_outputs(self, pvalueish):
  self.type_check_inputs_or_outputs(pvalueish, 'output')
with_input_types
with_input_types(input_type_hint)

Annotates the input type of a :class:PTransform with a type-hint.

PARAMETER DESCRIPTION
input_type_hint

An instance of an allowed built-in type, a custom class, or an instance of a :class:~apache_beam.typehints.typehints.TypeConstraint.

TYPE: type

RAISES DESCRIPTION
TypeError

If input_type_hint is not a valid type-hint. See :obj:apache_beam.typehints.typehints.validate_composite_type_param() for further details.

RETURNS DESCRIPTION
PTransform

A reference to the instance of this particular

class:PTransform object. This allows chaining type-hinting related

methods.

Source code in apache_beam/transforms/ptransform.py
def with_input_types(self, input_type_hint):
  """Annotates the input type of a :class:`PTransform` with a type-hint.

  Args:
    input_type_hint (type): An instance of an allowed built-in type, a custom
      class, or an instance of a
      :class:`~apache_beam.typehints.typehints.TypeConstraint`.

  Raises:
    TypeError: If **input_type_hint** is not a valid type-hint.
      See
      :obj:`apache_beam.typehints.typehints.validate_composite_type_param()`
      for further details.

  Returns:
    PTransform: A reference to the instance of this particular
    :class:`PTransform` object. This allows chaining type-hinting related
    methods.
  """
  input_type_hint = native_type_compatibility.convert_to_beam_type(
      input_type_hint)
  validate_composite_type_param(
      input_type_hint, 'Type hints for a PTransform')
  return super().with_input_types(input_type_hint)
with_output_types
with_output_types(type_hint)

Annotates the output type of a :class:PTransform with a type-hint.

PARAMETER DESCRIPTION
type_hint

An instance of an allowed built-in type, a custom class, or a :class:~apache_beam.typehints.typehints.TypeConstraint.

TYPE: type

RAISES DESCRIPTION
TypeError

If type_hint is not a valid type-hint. See :obj:~apache_beam.typehints.typehints.validate_composite_type_param() for further details.

RETURNS DESCRIPTION
PTransform

A reference to the instance of this particular

class:PTransform object. This allows chaining type-hinting related

methods.

Source code in apache_beam/transforms/ptransform.py
def with_output_types(self, type_hint):
  """Annotates the output type of a :class:`PTransform` with a type-hint.

  Args:
    type_hint (type): An instance of an allowed built-in type, a custom class,
      or a :class:`~apache_beam.typehints.typehints.TypeConstraint`.

  Raises:
    TypeError: If **type_hint** is not a valid type-hint. See
      :obj:`~apache_beam.typehints.typehints.validate_composite_type_param()`
      for further details.

  Returns:
    PTransform: A reference to the instance of this particular
    :class:`PTransform` object. This allows chaining type-hinting related
    methods.
  """
  type_hint = native_type_compatibility.convert_to_beam_type(type_hint)
  validate_composite_type_param(type_hint, 'Type hints for a PTransform')
  return super().with_output_types(type_hint)
with_resource_hints
with_resource_hints(**kwargs)

Adds resource hints to the :class:PTransform.

Resource hints allow users to express constraints on the environment where the transform should be executed. Interpretation of the resource hints is defined by Beam Runners. Runners may ignore the unsupported hints.

PARAMETER DESCRIPTION
**kwargs

key-value pairs describing hints and their values.

DEFAULT: {}

RAISES DESCRIPTION
ValueError

if provided hints are unknown to the SDK. See :mod:apache_beam.transforms.resources for a list of known hints.

RETURNS DESCRIPTION
PTransform

A reference to the instance of this particular

class:PTransform object.

Source code in apache_beam/transforms/ptransform.py
def with_resource_hints(self, **kwargs):  # type: (...) -> PTransform
  """Adds resource hints to the :class:`PTransform`.

  Resource hints allow users to express constraints on the environment where
  the transform should be executed.  Interpretation of the resource hints is
  defined by Beam Runners. Runners may ignore the unsupported hints.

  Args:
    **kwargs: key-value pairs describing hints and their values.

  Raises:
    ValueError: if provided hints are unknown to the SDK. See
      :mod:`apache_beam.transforms.resources` for a list of known hints.

  Returns:
    PTransform: A reference to the instance of this particular
    :class:`PTransform` object.
  """
  self.get_resource_hints().update(resources.parse_resource_hints(kwargs))
  return self

WriteAnalysisCacheToFS

WriteAnalysisCacheToFS(
    pipeline: Pipeline,
    cache_base_dir: str,
    dataset_keys: Optional[Iterable[DatasetKey]] = None,
    sink: Optional[object] = None,
)

Bases: PTransform

Writes a cache object that can be read by ReadAnalysisCacheFromFS.

Given a cache collection, this writes it to the configured directory. If the configured directory already contains cache, this will merge the new cache with the old. NOTE: This merging of cache is determined at beam graph construction time, so the cache must already exist there when constructing this.

Init method.

PARAMETER DESCRIPTION
pipeline

A beam Pipeline.

TYPE: Pipeline

cache_base_dir

A str, the path that the cache should be stored in.

TYPE: str

dataset_keys

(Optional) An iterable of strings.

TYPE: Optional[Iterable[DatasetKey]] DEFAULT: None

sink

(Optional) A PTransform class that takes a path in its constructor, and is used to write the cache. If not provided this uses a GZipped TFRecord sink.

TYPE: Optional[object] DEFAULT: None

Source code in tensorflow_transform/beam/analyzer_cache.py
def __init__(self,
             pipeline: beam.Pipeline,
             cache_base_dir: str,
             dataset_keys: Optional[Iterable[DatasetKey]] = None,
             sink: Optional[object] = None):
  """Init method.

  Args:
    pipeline: A beam Pipeline.
    cache_base_dir: A str, the path that the cache should be stored in.
    dataset_keys: (Optional) An iterable of strings.
    sink: (Optional) A PTransform class that takes a path in its constructor,
      and is used to write the cache. If not provided this uses a GZipped
      TFRecord sink.
  """
  self.pipeline = pipeline
  self._cache_base_dir = cache_base_dir
  if dataset_keys is None:
    self._sorted_dataset_keys = None
  else:
    self._sorted_dataset_keys = sorted(dataset_keys)
  self._sink = sink
  if self._sink is None:
    # TODO(b/37788560): Possibly use Riegeli as a default file format once
    # possible.
    self._sink = _WriteToTFRecordGzip
Attributes
label property writable
label
pipeline instance-attribute
pipeline = pipeline
side_inputs class-attribute instance-attribute
side_inputs = ()
Functions
annotations
annotations() -> dict[str, Union[bytes, str, Message]]
Source code in apache_beam/transforms/ptransform.py
def annotations(self) -> dict[str, Union[bytes, str, message.Message]]:
  return {
      'python_type':  #
      f'{self.__class__.__module__}.{self.__class__.__qualname__}'
  }
default_label
default_label()
Source code in apache_beam/transforms/ptransform.py
def default_label(self):
  # type: () -> str
  return self.__class__.__name__
default_type_hints
default_type_hints()
Source code in apache_beam/transforms/ptransform.py
def default_type_hints(self):
  fn_type_hints = IOTypeHints.from_callable(self.expand)
  if fn_type_hints is not None:
    fn_type_hints = fn_type_hints.strip_pcoll()

  # Prefer class decorator type hints for backwards compatibility.
  return get_type_hints(self.__class__).with_defaults(fn_type_hints)
display_data
display_data()

Returns the display data associated to a pipeline component.

It should be reimplemented in pipeline components that wish to have static display data.

RETURNS DESCRIPTION

Dict[str, Any]: A dictionary containing key:value pairs.

The value might be an integer, float or string value; a

class:DisplayDataItem for values that have more data

(e.g. short value, label, url); or a :class:HasDisplayData instance

that has more display data that should be picked up. For example::

{ 'key1': 'string_value', 'key2': 1234, 'key3': 3.14159265, 'key4': DisplayDataItem('apache.org', url='http://apache.org'), 'key5': subComponent }

Source code in apache_beam/transforms/display.py
def display_data(self):
  # type: () -> dict

  """ Returns the display data associated to a pipeline component.

  It should be reimplemented in pipeline components that wish to have
  static display data.

  Returns:
    Dict[str, Any]: A dictionary containing ``key:value`` pairs.
    The value might be an integer, float or string value; a
    :class:`DisplayDataItem` for values that have more data
    (e.g. short value, label, url); or a :class:`HasDisplayData` instance
    that has more display data that should be picked up. For example::

      {
        'key1': 'string_value',
        'key2': 1234,
        'key3': 3.14159265,
        'key4': DisplayDataItem('apache.org', url='http://apache.org'),
        'key5': subComponent
      }
  """
  return {}
expand
expand(dataset_cache_dict)
Source code in tensorflow_transform/beam/analyzer_cache.py
def expand(self, dataset_cache_dict):
  if self._sorted_dataset_keys is None:
    sorted_dataset_keys_list = sorted(dataset_cache_dict.keys())
  else:
    sorted_dataset_keys_list = self._sorted_dataset_keys
    missing_keys = set(dataset_cache_dict.keys()).difference(
        set(sorted_dataset_keys_list))
    if missing_keys:
      raise ValueError(
          'The dataset keys in the cache dictionary must be a subset of the '
          'keys in dataset_keys. Missing {}.'.format(missing_keys))
  if not all(isinstance(d, DatasetKey) for d in sorted_dataset_keys_list):
    raise ValueError('Expected dataset_keys to be of type DatasetKey')

  cache_is_written = []
  for dataset_key, cache in dataset_cache_dict.items():
    dataset_key_idx = sorted_dataset_keys_list.index(dataset_key)
    dataset_key_dir = _get_dataset_cache_path(self._cache_base_dir,
                                              dataset_key)
    with _ManifestFile(dataset_key_dir) as manifest_file:
      cache_is_written.extend(
          self._write_cache(manifest_file, dataset_key_idx, dataset_key_dir,
                            cache))

  return cache_is_written
from_runner_api classmethod
from_runner_api(proto, context)
Source code in apache_beam/transforms/ptransform.py
@classmethod
def from_runner_api(cls,
                    proto,  # type: Optional[beam_runner_api_pb2.PTransform]
                    context  # type: PipelineContext
                   ):
  # type: (...) -> Optional[PTransform]
  if proto is None or proto.spec is None or not proto.spec.urn:
    return None
  parameter_type, constructor = cls._known_urns[proto.spec.urn]

  return constructor(
      proto,
      proto_utils.parse_Bytes(proto.spec.payload, parameter_type),
      context)
get_resource_hints
get_resource_hints()
Source code in apache_beam/transforms/ptransform.py
def get_resource_hints(self):
  # type: () -> dict[str, bytes]
  if '_resource_hints' not in self.__dict__:
    # PTransform subclasses don't always call super(), so prefer lazy
    # initialization. By default, transforms don't have any resource hints.
    self._resource_hints = {}  # type: dict[str, bytes]
  return self._resource_hints
get_type_hints
get_type_hints()

Gets and/or initializes type hints for this object.

If type hints have not been set, attempts to initialize type hints in this order: - Using self.default_type_hints(). - Using self.class type hints.

Source code in apache_beam/typehints/decorators.py
def get_type_hints(self):
  """Gets and/or initializes type hints for this object.

  If type hints have not been set, attempts to initialize type hints in this
  order:
  - Using self.default_type_hints().
  - Using self.__class__ type hints.
  """
  return (
      self._get_or_create_type_hints().with_defaults(
          self.default_type_hints()).with_defaults(
              get_type_hints(self.__class__)))
get_windowing
get_windowing(inputs)

Returns the window function to be associated with transform's output.

By default most transforms just return the windowing function associated with the input PCollection (or the first input if several).

Source code in apache_beam/transforms/ptransform.py
def get_windowing(self, inputs):
  # type: (Any) -> Windowing

  """Returns the window function to be associated with transform's output.

  By default most transforms just return the windowing function associated
  with the input PCollection (or the first input if several).
  """
  if inputs:
    return inputs[0].windowing
  else:
    from apache_beam.transforms.core import Windowing
    from apache_beam.transforms.window import GlobalWindows
    # TODO(robertwb): Return something compatible with every windowing?
    return Windowing(GlobalWindows())
infer_output_type
infer_output_type(unused_input_type)
Source code in apache_beam/transforms/ptransform.py
def infer_output_type(self, unused_input_type):
  return self.get_type_hints().simple_output_type(self.label) or typehints.Any
register_urn classmethod
register_urn(urn, parameter_type)
register_urn(urn, parameter_type)
register_urn(urn, parameter_type, constructor)
register_urn(urn, parameter_type, constructor)
register_urn(urn, parameter_type, constructor=None)
Source code in apache_beam/transforms/ptransform.py
@classmethod
def register_urn(cls, urn, parameter_type, constructor=None):
  def register(constructor):
    if isinstance(constructor, type):
      constructor.from_runner_api_parameter = register(
          constructor.from_runner_api_parameter)
    else:
      cls._known_urns[urn] = parameter_type, constructor
    return constructor

  if constructor:
    # Used as a statement.
    register(constructor)
  else:
    # Used as a decorator.
    return register
runner_api_requires_keyed_input
runner_api_requires_keyed_input()
Source code in apache_beam/transforms/ptransform.py
def runner_api_requires_keyed_input(self):
  return False
to_runner_api
to_runner_api(context, has_parts=False, **extra_kwargs)
Source code in apache_beam/transforms/ptransform.py
def to_runner_api(self, context, has_parts=False, **extra_kwargs):
  # type: (PipelineContext, bool, Any) -> beam_runner_api_pb2.FunctionSpec
  from apache_beam.portability.api import beam_runner_api_pb2
  # typing: only ParDo supports extra_kwargs
  urn, typed_param = self.to_runner_api_parameter(context, **extra_kwargs)
  if urn == python_urns.GENERIC_COMPOSITE_TRANSFORM and not has_parts:
    # TODO(https://github.com/apache/beam/issues/18713): Remove this fallback.
    urn, typed_param = self.to_runner_api_pickled(context)
  return beam_runner_api_pb2.FunctionSpec(
      urn=urn,
      payload=typed_param.SerializeToString() if isinstance(
          typed_param, message.Message) else typed_param.encode('utf-8')
      if isinstance(typed_param, str) else typed_param)
to_runner_api_parameter
to_runner_api_parameter(unused_context)
Source code in apache_beam/transforms/ptransform.py
def to_runner_api_parameter(
    self,
    unused_context  # type: PipelineContext
):
  # type: (...) -> tuple[str, Optional[Union[message.Message, bytes, str]]]
  # The payload here is just to ease debugging.
  return (
      python_urns.GENERIC_COMPOSITE_TRANSFORM,
      getattr(self, '_fn_api_payload', str(self)))
to_runner_api_pickled
to_runner_api_pickled(context)
Source code in apache_beam/transforms/ptransform.py
def to_runner_api_pickled(self, context):
  # type: (PipelineContext) -> tuple[str, bytes]
  return (
      python_urns.PICKLED_TRANSFORM,
      pickler.dumps(
          self,
          enable_best_effort_determinism=context.
          enable_best_effort_deterministic_pickling,
      ),
  )
type_check_inputs
type_check_inputs(pvalueish)
Source code in apache_beam/transforms/ptransform.py
def type_check_inputs(self, pvalueish):
  self.type_check_inputs_or_outputs(pvalueish, 'input')
type_check_inputs_or_outputs
type_check_inputs_or_outputs(pvalueish, input_or_output)
Source code in apache_beam/transforms/ptransform.py
def type_check_inputs_or_outputs(self, pvalueish, input_or_output):
  type_hints = self.get_type_hints()
  hints = getattr(type_hints, input_or_output + '_types')
  if hints is None or not any(hints):
    return
  arg_hints, kwarg_hints = hints
  if arg_hints and kwarg_hints:
    raise TypeCheckError(
        'PTransform cannot have both positional and keyword type hints '
        'without overriding %s._type_check_%s()' %
        (self.__class__, input_or_output))
  root_hint = (
      arg_hints[0] if len(arg_hints) == 1 else arg_hints or kwarg_hints)
  for context, pvalue_, hint in _ZipPValues().visit(pvalueish, root_hint):
    if isinstance(pvalue_, DoOutputsTuple):
      continue
    if pvalue_.element_type is None:
      # TODO(robertwb): It's a bug that we ever get here. (typecheck)
      continue
    if hint and not typehints.is_consistent_with(pvalue_.element_type, hint):
      at_context = ' %s %s' % (input_or_output, context) if context else ''
      raise TypeCheckError(
          '{type} type hint violation at {label}{context}: expected {hint}, '
          'got {actual_type}'.format(
              type=input_or_output.title(),
              label=self.label,
              context=at_context,
              hint=hint,
              actual_type=pvalue_.element_type))
type_check_outputs
type_check_outputs(pvalueish)
Source code in apache_beam/transforms/ptransform.py
def type_check_outputs(self, pvalueish):
  self.type_check_inputs_or_outputs(pvalueish, 'output')
with_input_types
with_input_types(input_type_hint)

Annotates the input type of a :class:PTransform with a type-hint.

PARAMETER DESCRIPTION
input_type_hint

An instance of an allowed built-in type, a custom class, or an instance of a :class:~apache_beam.typehints.typehints.TypeConstraint.

TYPE: type

RAISES DESCRIPTION
TypeError

If input_type_hint is not a valid type-hint. See :obj:apache_beam.typehints.typehints.validate_composite_type_param() for further details.

RETURNS DESCRIPTION
PTransform

A reference to the instance of this particular

class:PTransform object. This allows chaining type-hinting related

methods.

Source code in apache_beam/transforms/ptransform.py
def with_input_types(self, input_type_hint):
  """Annotates the input type of a :class:`PTransform` with a type-hint.

  Args:
    input_type_hint (type): An instance of an allowed built-in type, a custom
      class, or an instance of a
      :class:`~apache_beam.typehints.typehints.TypeConstraint`.

  Raises:
    TypeError: If **input_type_hint** is not a valid type-hint.
      See
      :obj:`apache_beam.typehints.typehints.validate_composite_type_param()`
      for further details.

  Returns:
    PTransform: A reference to the instance of this particular
    :class:`PTransform` object. This allows chaining type-hinting related
    methods.
  """
  input_type_hint = native_type_compatibility.convert_to_beam_type(
      input_type_hint)
  validate_composite_type_param(
      input_type_hint, 'Type hints for a PTransform')
  return super().with_input_types(input_type_hint)
with_output_types
with_output_types(type_hint)

Annotates the output type of a :class:PTransform with a type-hint.

PARAMETER DESCRIPTION
type_hint

An instance of an allowed built-in type, a custom class, or a :class:~apache_beam.typehints.typehints.TypeConstraint.

TYPE: type

RAISES DESCRIPTION
TypeError

If type_hint is not a valid type-hint. See :obj:~apache_beam.typehints.typehints.validate_composite_type_param() for further details.

RETURNS DESCRIPTION
PTransform

A reference to the instance of this particular

class:PTransform object. This allows chaining type-hinting related

methods.

Source code in apache_beam/transforms/ptransform.py
def with_output_types(self, type_hint):
  """Annotates the output type of a :class:`PTransform` with a type-hint.

  Args:
    type_hint (type): An instance of an allowed built-in type, a custom class,
      or a :class:`~apache_beam.typehints.typehints.TypeConstraint`.

  Raises:
    TypeError: If **type_hint** is not a valid type-hint. See
      :obj:`~apache_beam.typehints.typehints.validate_composite_type_param()`
      for further details.

  Returns:
    PTransform: A reference to the instance of this particular
    :class:`PTransform` object. This allows chaining type-hinting related
    methods.
  """
  type_hint = native_type_compatibility.convert_to_beam_type(type_hint)
  validate_composite_type_param(type_hint, 'Type hints for a PTransform')
  return super().with_output_types(type_hint)
with_resource_hints
with_resource_hints(**kwargs)

Adds resource hints to the :class:PTransform.

Resource hints allow users to express constraints on the environment where the transform should be executed. Interpretation of the resource hints is defined by Beam Runners. Runners may ignore the unsupported hints.

PARAMETER DESCRIPTION
**kwargs

key-value pairs describing hints and their values.

DEFAULT: {}

RAISES DESCRIPTION
ValueError

if provided hints are unknown to the SDK. See :mod:apache_beam.transforms.resources for a list of known hints.

RETURNS DESCRIPTION
PTransform

A reference to the instance of this particular

class:PTransform object.

Source code in apache_beam/transforms/ptransform.py
def with_resource_hints(self, **kwargs):  # type: (...) -> PTransform
  """Adds resource hints to the :class:`PTransform`.

  Resource hints allow users to express constraints on the environment where
  the transform should be executed.  Interpretation of the resource hints is
  defined by Beam Runners. Runners may ignore the unsupported hints.

  Args:
    **kwargs: key-value pairs describing hints and their values.

  Raises:
    ValueError: if provided hints are unknown to the SDK. See
      :mod:`apache_beam.transforms.resources` for a list of known hints.

  Returns:
    PTransform: A reference to the instance of this particular
    :class:`PTransform` object.
  """
  self.get_resource_hints().update(resources.parse_resource_hints(kwargs))
  return self

Functions

make_cache_entry_key

make_cache_entry_key(cache_key: str) -> str
Source code in tensorflow_transform/beam/analyzer_cache.py
def make_cache_entry_key(cache_key: str) -> str:
  return _CACHE_VERSION + tf.compat.as_bytes(cache_key)

validate_dataset_keys

validate_dataset_keys(dataset_keys: Iterable[DatasetKey])
Source code in tensorflow_transform/beam/analyzer_cache.py
def validate_dataset_keys(dataset_keys: Iterable[DatasetKey]):
  regex = re.compile(r'^[a-zA-Z0-9\.\-_]+$')
  for dataset_key in dataset_keys:
    if not isinstance(dataset_key, DatasetKey):
      raise ValueError('Dataset key {} must be of type DatasetKey')
    if not regex.match(dataset_key.key):
      raise ValueError(
          'Dataset key {!r} does not match allowed pattern: {!r}'.format(
              dataset_key.key, regex.pattern))