Module pipeline_dp.dp_engine
DP aggregations.
Expand source code
"""DP aggregations."""
import math
import dataclasses
import functools
from typing import Any, Callable, Tuple
import numpy as np
import pipeline_dp
from pipeline_dp import combiners
import pipeline_dp.report_generator as report_generator
import pydp.algorithms.partition_selection as partition_selection
@dataclasses.dataclass
class DataExtractors:
"""Data extractors.
A set of functions that, given an input, return the privacy id, partition key,
and value.
"""
privacy_id_extractor: Callable = None
partition_extractor: Callable = None
value_extractor: Callable = None
class DPEngine:
"""Performs DP aggregations."""
def __init__(self, budget_accountant: 'BudgetAccountant',
backend: 'PipelineBackend'):
self._budget_accountant = budget_accountant
self._backend = backend
self._report_generators = []
def _add_report_stage(self, text):
self._report_generators[-1].add_stage(text)
def aggregate(self, col, params: pipeline_dp.AggregateParams,
data_extractors: DataExtractors):
"""Computes DP aggregation metrics.
Args:
col: collection with elements of the same type.
params: specifies which metrics to compute and computation parameters.
data_extractors: functions that extract needed pieces of information
from elements of 'col'.
"""
self._check_aggregate_params(col, params, data_extractors)
with self._budget_accountant.scope(weight=params.budget_weight):
return self._aggregate(col, params, data_extractors)
def _aggregate(self, col, params: pipeline_dp.AggregateParams,
data_extractors: DataExtractors):
self._report_generators.append(report_generator.ReportGenerator(params))
combiner = combiners.create_compound_combiner(params,
self._budget_accountant)
if params.public_partitions is not None:
col = self._drop_not_public_partitions(col,
params.public_partitions,
data_extractors)
# Extract the columns.
col = self._backend.map(
col, lambda row: (data_extractors.privacy_id_extractor(row),
data_extractors.partition_extractor(row),
data_extractors.value_extractor(row)),
"Extract (privacy_id, partition_key, value))")
# col : (privacy_id, partition_key, value)
col = self._bound_contributions(col, params.max_partitions_contributed,
params.max_contributions_per_partition,
combiner.create_accumulator)
# col : ((privacy_id, partition_key), accumulator)
col = self._backend.map_tuple(col, lambda pid_pk, v: (pid_pk[1], v),
"Drop privacy id")
# col : (partition_key, accumulator)
if params.public_partitions:
col = self._add_empty_public_partitions(col,
params.public_partitions,
combiner.create_accumulator)
# col : (partition_key, accumulator)
col = self._backend.combine_accumulators_per_key(
col, combiner, "Reduce accumulators per partition key")
# col : (partition_key, accumulator)
if params.public_partitions is None:
col = self._select_private_partitions_internal(
col, params.max_partitions_contributed)
else:
pass
# col : (partition_key, accumulator)
# Compute DP metrics.
col = self._backend.map_values(col, combiner.compute_metrics,
"Compute DP` metrics")
return col
def _check_aggregate_params(self, col, params: pipeline_dp.AggregateParams,
data_extractors: DataExtractors):
if col is None or not col:
raise ValueError("col must be non-empty")
if params is None:
raise ValueError("params must be set to a valid AggregateParams")
if not isinstance(params, pipeline_dp.AggregateParams):
raise TypeError("params must be set to a valid AggregateParams")
if not isinstance(params.max_partitions_contributed,
int) or params.max_partitions_contributed <= 0:
raise ValueError("params.max_partitions_contributed must be set "
"to a positive integer")
if not isinstance(params.max_contributions_per_partition,
int) or params.max_contributions_per_partition <= 0:
raise ValueError(
"params.max_contributions_per_partition must be set "
"to a positive integer")
needs_min_max_value = pipeline_dp.Metrics.SUM in params.metrics
if needs_min_max_value and (params.min_value is None or
params.max_value is None):
raise ValueError(
"params.min_value and params.max_value must be set")
if needs_min_max_value and (
self._not_a_proper_number(params.min_value) or
self._not_a_proper_number(params.max_value)):
raise ValueError(
"params.min_value and params.max_value must be both finite numbers"
)
if needs_min_max_value and params.max_value < params.min_value:
raise ValueError(
"params.max_value must be equal to or greater than params.min_value"
)
if data_extractors is None:
raise ValueError("data_extractors must be set to a DataExtractors")
if not isinstance(data_extractors, pipeline_dp.DataExtractors):
raise TypeError("data_extractors must be set to a DataExtractors")
def _check_select_private_partitions(
self, col, params: pipeline_dp.SelectPrivatePartitionsParams,
data_extractors: DataExtractors):
if col is None or not col:
raise ValueError("col must be non-empty")
if params is None:
raise ValueError(
"params must be set to a valid SelectPrivatePartitionsParams")
if not isinstance(params, pipeline_dp.SelectPrivatePartitionsParams):
raise TypeError(
"params must be set to a valid SelectPrivatePartitionsParams")
if not isinstance(params.max_partitions_contributed,
int) or params.max_partitions_contributed <= 0:
raise ValueError("params.max_partitions_contributed must be set "
"(to a positive integer)")
if data_extractors is None:
raise ValueError("data_extractors must be set to a DataExtractors")
if not isinstance(data_extractors, pipeline_dp.DataExtractors):
raise TypeError("data_extractors must be set to a DataExtractors")
def select_private_partitions(
self, col, params: pipeline_dp.SelectPrivatePartitionsParams,
data_extractors: DataExtractors):
"""Retrieves a collection of differentially-private partitions.
Args:
col: collection with elements of the same type.
params: parameters, see doc for SelectPrivatePartitionsParams.
data_extractors: functions that extract needed pieces of information
from elements of 'col'. Only privacy_id_extractor and partition_extractor are required.
value_extractor is not required.
"""
self._check_select_private_partitions(col, params, data_extractors)
self._report_generators.append(report_generator.ReportGenerator(params))
max_partitions_contributed = params.max_partitions_contributed
# Extract the columns.
col = self._backend.map(
col, lambda row: (data_extractors.privacy_id_extractor(row),
data_extractors.partition_extractor(row)),
"Extract (privacy_id, partition_key))")
# col : (privacy_id, partition_key)
# Apply cross-partition contribution bounding
col = self._backend.group_by_key(col, "Group by privacy_id")
# col : (privacy_id, [partition_key])
# Note: This may not be scalable if a single privacy ID contributes
# to _way_ too many partitions.
def sample_unique_elements_fn(pid_and_pks):
pid, pks = pid_and_pks
unique_pks = list(set(pks))
if len(unique_pks) <= max_partitions_contributed:
sampled_elements = unique_pks
else:
# np.random.choice makes casting of elements to numpy types
# which is undesirable by 2 reasons:
# 1. Apache Beam can not serialize numpy types.
# 2. It might lead for losing precision (e.g. arbitrary
# precision int is converted to int64).
# So np.random.choice should not be applied directly to
# 'unique_pks'. It is better to apply it to indices.
sampled_indices = np.random.choice(np.arange(len(unique_pks)),
max_partitions_contributed,
replace=False)
sampled_elements = [unique_pks[i] for i in sampled_indices]
return ((pid, pk) for pk in sampled_elements)
col = self._backend.flat_map(col, sample_unique_elements_fn,
"Sample cross-partition contributions")
# col : (privacy_id, partition_key)
# A compound accumulator without any child accumulators is used to calculate the raw privacy ID count.
compound_combiner = combiners.CompoundCombiner([])
col = self._backend.map_tuple(
col, lambda pid, pk: (pk, compound_combiner.create_accumulator([])),
"Drop privacy id and add accumulator")
# col : (partition_key, accumulator)
col = self._backend.combine_accumulators_per_key(
col, compound_combiner, "Combine accumulators per partition key")
# col : (partition_key, accumulator)
col = self._select_private_partitions_internal(
col, max_partitions_contributed)
col = self._backend.keys(col,
"Drop accumulators, keep only partition keys")
return col
def _drop_not_public_partitions(self, col, public_partitions,
data_extractors: DataExtractors):
"""Drops partitions in `col` which are not in `public_partitions`."""
col = self._backend.map(
col, lambda row: (data_extractors.partition_extractor(row), row),
"Extract partition id")
col = self._backend.filter_by_key(
col, public_partitions, "Filtering out non-public partitions")
self._add_report_stage(
f"Public partition selection: dropped non public partitions")
return self._backend.map_tuple(col, lambda k, v: v, "Drop key")
def _add_empty_public_partitions(self, col, public_partitions,
aggregator_fn):
"""Adds empty accumulators to all `public_partitions` and returns those
empty accumulators joined with `col`."""
self._add_report_stage(
"Adding empty partitions to public partitions that are missing in "
"data")
empty_accumulators = self._backend.map(
public_partitions, lambda partition_key:
(partition_key, aggregator_fn([])), "Build empty accumulators")
return self._backend.flatten(
col, empty_accumulators,
"Join public partitions with partitions from data")
def _bound_contributions(self, col, max_partitions_contributed: int,
max_contributions_per_partition: int,
aggregator_fn):
"""Bounds the contribution by privacy_id in and cross partitions.
Args:
col: collection, with types of each element: (privacy_id,
partition_key, value).
max_partitions_contributed: maximum number of partitions that one
privacy id can contribute to.
max_contributions_per_partition: maximum number of records that one
privacy id can contribute to one partition.
aggregator_fn: function that takes a list of values and returns an
aggregator object which handles all aggregation logic.
return: collection with elements ((privacy_id, partition_key),
accumulator).
"""
# per partition-contribution bounding with bounding of each contribution
col = self._backend.map_tuple(
col, lambda pid, pk, v: ((pid, pk), v),
"Rekey to ( (privacy_id, partition_key), value))")
col = self._backend.sample_fixed_per_key(
col, max_contributions_per_partition,
"Sample per (privacy_id, partition_key)")
self._add_report_stage(
f"Per-partition contribution bounding: randomly selected not "
f"more than {max_contributions_per_partition} contributions")
# ((privacy_id, partition_key), [value])
col = self._backend.map_values(
col, aggregator_fn,
"Apply aggregate_fn after per partition bounding")
# ((privacy_id, partition_key), accumulator)
# Cross partition bounding
col = self._backend.map_tuple(
col, lambda pid_pk, v: (pid_pk[0], (pid_pk[1], v)),
"Rekey to (privacy_id, (partition_key, "
"accumulator))")
col = self._backend.sample_fixed_per_key(col,
max_partitions_contributed,
"Sample per privacy_id")
self._add_report_stage(
f"Cross-partition contribution bounding: randomly selected not more than "
f"{max_partitions_contributed} partitions per user")
# (privacy_id, [(partition_key, accumulator)])
def unnest_cross_partition_bound_sampled_per_key(pid_pk_v):
pid, pk_values = pid_pk_v
return (((pid, pk), v) for (pk, v) in pk_values)
return self._backend.flat_map(
col, unnest_cross_partition_bound_sampled_per_key, "Unnest")
def _select_private_partitions_internal(self, col,
max_partitions_contributed: int):
"""Selects and publishes private partitions.
Args:
col: collection, with types for each element:
(partition_key, Accumulator)
max_partitions_contributed: maximum amount of partitions that one privacy unit
might contribute.
Returns:
collection of elements (partition_key, accumulator)
"""
budget = self._budget_accountant.request_budget(
mechanism_type=pipeline_dp.MechanismType.GENERIC)
def filter_fn(
budget: 'MechanismSpec', max_partitions: int,
row: Tuple[Any,
combiners.CompoundCombiner.AccumulatorType]) -> bool:
"""Lazily creates a partition selection strategy and uses it to determine which
partitions to keep."""
pirvacy_id_count, _ = row[1]
partition_selection_strategy = (
partition_selection.
create_truncated_geometric_partition_strategy(
budget.eps, budget.delta, max_partitions))
return partition_selection_strategy.should_keep(pirvacy_id_count)
# make filter_fn serializable
filter_fn = functools.partial(filter_fn, budget,
max_partitions_contributed)
self._add_report_stage(
lambda:
f"Private Partition selection: using {budget.mechanism_type.value} "
f"method with (eps= {budget.eps}, delta = {budget.delta})")
return self._backend.filter(col, filter_fn, "Filter private partitions")
def _not_a_proper_number(self, num):
"""
Returns:
true if num is inf or NaN, false otherwise.
"""
return math.isnan(num) or math.isinf(num)
Classes
class DPEngine (budget_accountant: BudgetAccountant, backend: PipelineBackend)
-
Performs DP aggregations.
Expand source code
class DPEngine: """Performs DP aggregations.""" def __init__(self, budget_accountant: 'BudgetAccountant', backend: 'PipelineBackend'): self._budget_accountant = budget_accountant self._backend = backend self._report_generators = [] def _add_report_stage(self, text): self._report_generators[-1].add_stage(text) def aggregate(self, col, params: pipeline_dp.AggregateParams, data_extractors: DataExtractors): """Computes DP aggregation metrics. Args: col: collection with elements of the same type. params: specifies which metrics to compute and computation parameters. data_extractors: functions that extract needed pieces of information from elements of 'col'. """ self._check_aggregate_params(col, params, data_extractors) with self._budget_accountant.scope(weight=params.budget_weight): return self._aggregate(col, params, data_extractors) def _aggregate(self, col, params: pipeline_dp.AggregateParams, data_extractors: DataExtractors): self._report_generators.append(report_generator.ReportGenerator(params)) combiner = combiners.create_compound_combiner(params, self._budget_accountant) if params.public_partitions is not None: col = self._drop_not_public_partitions(col, params.public_partitions, data_extractors) # Extract the columns. col = self._backend.map( col, lambda row: (data_extractors.privacy_id_extractor(row), data_extractors.partition_extractor(row), data_extractors.value_extractor(row)), "Extract (privacy_id, partition_key, value))") # col : (privacy_id, partition_key, value) col = self._bound_contributions(col, params.max_partitions_contributed, params.max_contributions_per_partition, combiner.create_accumulator) # col : ((privacy_id, partition_key), accumulator) col = self._backend.map_tuple(col, lambda pid_pk, v: (pid_pk[1], v), "Drop privacy id") # col : (partition_key, accumulator) if params.public_partitions: col = self._add_empty_public_partitions(col, params.public_partitions, combiner.create_accumulator) # col : (partition_key, accumulator) col = self._backend.combine_accumulators_per_key( col, combiner, "Reduce accumulators per partition key") # col : (partition_key, accumulator) if params.public_partitions is None: col = self._select_private_partitions_internal( col, params.max_partitions_contributed) else: pass # col : (partition_key, accumulator) # Compute DP metrics. col = self._backend.map_values(col, combiner.compute_metrics, "Compute DP` metrics") return col def _check_aggregate_params(self, col, params: pipeline_dp.AggregateParams, data_extractors: DataExtractors): if col is None or not col: raise ValueError("col must be non-empty") if params is None: raise ValueError("params must be set to a valid AggregateParams") if not isinstance(params, pipeline_dp.AggregateParams): raise TypeError("params must be set to a valid AggregateParams") if not isinstance(params.max_partitions_contributed, int) or params.max_partitions_contributed <= 0: raise ValueError("params.max_partitions_contributed must be set " "to a positive integer") if not isinstance(params.max_contributions_per_partition, int) or params.max_contributions_per_partition <= 0: raise ValueError( "params.max_contributions_per_partition must be set " "to a positive integer") needs_min_max_value = pipeline_dp.Metrics.SUM in params.metrics if needs_min_max_value and (params.min_value is None or params.max_value is None): raise ValueError( "params.min_value and params.max_value must be set") if needs_min_max_value and ( self._not_a_proper_number(params.min_value) or self._not_a_proper_number(params.max_value)): raise ValueError( "params.min_value and params.max_value must be both finite numbers" ) if needs_min_max_value and params.max_value < params.min_value: raise ValueError( "params.max_value must be equal to or greater than params.min_value" ) if data_extractors is None: raise ValueError("data_extractors must be set to a DataExtractors") if not isinstance(data_extractors, pipeline_dp.DataExtractors): raise TypeError("data_extractors must be set to a DataExtractors") def _check_select_private_partitions( self, col, params: pipeline_dp.SelectPrivatePartitionsParams, data_extractors: DataExtractors): if col is None or not col: raise ValueError("col must be non-empty") if params is None: raise ValueError( "params must be set to a valid SelectPrivatePartitionsParams") if not isinstance(params, pipeline_dp.SelectPrivatePartitionsParams): raise TypeError( "params must be set to a valid SelectPrivatePartitionsParams") if not isinstance(params.max_partitions_contributed, int) or params.max_partitions_contributed <= 0: raise ValueError("params.max_partitions_contributed must be set " "(to a positive integer)") if data_extractors is None: raise ValueError("data_extractors must be set to a DataExtractors") if not isinstance(data_extractors, pipeline_dp.DataExtractors): raise TypeError("data_extractors must be set to a DataExtractors") def select_private_partitions( self, col, params: pipeline_dp.SelectPrivatePartitionsParams, data_extractors: DataExtractors): """Retrieves a collection of differentially-private partitions. Args: col: collection with elements of the same type. params: parameters, see doc for SelectPrivatePartitionsParams. data_extractors: functions that extract needed pieces of information from elements of 'col'. Only privacy_id_extractor and partition_extractor are required. value_extractor is not required. """ self._check_select_private_partitions(col, params, data_extractors) self._report_generators.append(report_generator.ReportGenerator(params)) max_partitions_contributed = params.max_partitions_contributed # Extract the columns. col = self._backend.map( col, lambda row: (data_extractors.privacy_id_extractor(row), data_extractors.partition_extractor(row)), "Extract (privacy_id, partition_key))") # col : (privacy_id, partition_key) # Apply cross-partition contribution bounding col = self._backend.group_by_key(col, "Group by privacy_id") # col : (privacy_id, [partition_key]) # Note: This may not be scalable if a single privacy ID contributes # to _way_ too many partitions. def sample_unique_elements_fn(pid_and_pks): pid, pks = pid_and_pks unique_pks = list(set(pks)) if len(unique_pks) <= max_partitions_contributed: sampled_elements = unique_pks else: # np.random.choice makes casting of elements to numpy types # which is undesirable by 2 reasons: # 1. Apache Beam can not serialize numpy types. # 2. It might lead for losing precision (e.g. arbitrary # precision int is converted to int64). # So np.random.choice should not be applied directly to # 'unique_pks'. It is better to apply it to indices. sampled_indices = np.random.choice(np.arange(len(unique_pks)), max_partitions_contributed, replace=False) sampled_elements = [unique_pks[i] for i in sampled_indices] return ((pid, pk) for pk in sampled_elements) col = self._backend.flat_map(col, sample_unique_elements_fn, "Sample cross-partition contributions") # col : (privacy_id, partition_key) # A compound accumulator without any child accumulators is used to calculate the raw privacy ID count. compound_combiner = combiners.CompoundCombiner([]) col = self._backend.map_tuple( col, lambda pid, pk: (pk, compound_combiner.create_accumulator([])), "Drop privacy id and add accumulator") # col : (partition_key, accumulator) col = self._backend.combine_accumulators_per_key( col, compound_combiner, "Combine accumulators per partition key") # col : (partition_key, accumulator) col = self._select_private_partitions_internal( col, max_partitions_contributed) col = self._backend.keys(col, "Drop accumulators, keep only partition keys") return col def _drop_not_public_partitions(self, col, public_partitions, data_extractors: DataExtractors): """Drops partitions in `col` which are not in `public_partitions`.""" col = self._backend.map( col, lambda row: (data_extractors.partition_extractor(row), row), "Extract partition id") col = self._backend.filter_by_key( col, public_partitions, "Filtering out non-public partitions") self._add_report_stage( f"Public partition selection: dropped non public partitions") return self._backend.map_tuple(col, lambda k, v: v, "Drop key") def _add_empty_public_partitions(self, col, public_partitions, aggregator_fn): """Adds empty accumulators to all `public_partitions` and returns those empty accumulators joined with `col`.""" self._add_report_stage( "Adding empty partitions to public partitions that are missing in " "data") empty_accumulators = self._backend.map( public_partitions, lambda partition_key: (partition_key, aggregator_fn([])), "Build empty accumulators") return self._backend.flatten( col, empty_accumulators, "Join public partitions with partitions from data") def _bound_contributions(self, col, max_partitions_contributed: int, max_contributions_per_partition: int, aggregator_fn): """Bounds the contribution by privacy_id in and cross partitions. Args: col: collection, with types of each element: (privacy_id, partition_key, value). max_partitions_contributed: maximum number of partitions that one privacy id can contribute to. max_contributions_per_partition: maximum number of records that one privacy id can contribute to one partition. aggregator_fn: function that takes a list of values and returns an aggregator object which handles all aggregation logic. return: collection with elements ((privacy_id, partition_key), accumulator). """ # per partition-contribution bounding with bounding of each contribution col = self._backend.map_tuple( col, lambda pid, pk, v: ((pid, pk), v), "Rekey to ( (privacy_id, partition_key), value))") col = self._backend.sample_fixed_per_key( col, max_contributions_per_partition, "Sample per (privacy_id, partition_key)") self._add_report_stage( f"Per-partition contribution bounding: randomly selected not " f"more than {max_contributions_per_partition} contributions") # ((privacy_id, partition_key), [value]) col = self._backend.map_values( col, aggregator_fn, "Apply aggregate_fn after per partition bounding") # ((privacy_id, partition_key), accumulator) # Cross partition bounding col = self._backend.map_tuple( col, lambda pid_pk, v: (pid_pk[0], (pid_pk[1], v)), "Rekey to (privacy_id, (partition_key, " "accumulator))") col = self._backend.sample_fixed_per_key(col, max_partitions_contributed, "Sample per privacy_id") self._add_report_stage( f"Cross-partition contribution bounding: randomly selected not more than " f"{max_partitions_contributed} partitions per user") # (privacy_id, [(partition_key, accumulator)]) def unnest_cross_partition_bound_sampled_per_key(pid_pk_v): pid, pk_values = pid_pk_v return (((pid, pk), v) for (pk, v) in pk_values) return self._backend.flat_map( col, unnest_cross_partition_bound_sampled_per_key, "Unnest") def _select_private_partitions_internal(self, col, max_partitions_contributed: int): """Selects and publishes private partitions. Args: col: collection, with types for each element: (partition_key, Accumulator) max_partitions_contributed: maximum amount of partitions that one privacy unit might contribute. Returns: collection of elements (partition_key, accumulator) """ budget = self._budget_accountant.request_budget( mechanism_type=pipeline_dp.MechanismType.GENERIC) def filter_fn( budget: 'MechanismSpec', max_partitions: int, row: Tuple[Any, combiners.CompoundCombiner.AccumulatorType]) -> bool: """Lazily creates a partition selection strategy and uses it to determine which partitions to keep.""" pirvacy_id_count, _ = row[1] partition_selection_strategy = ( partition_selection. create_truncated_geometric_partition_strategy( budget.eps, budget.delta, max_partitions)) return partition_selection_strategy.should_keep(pirvacy_id_count) # make filter_fn serializable filter_fn = functools.partial(filter_fn, budget, max_partitions_contributed) self._add_report_stage( lambda: f"Private Partition selection: using {budget.mechanism_type.value} " f"method with (eps= {budget.eps}, delta = {budget.delta})") return self._backend.filter(col, filter_fn, "Filter private partitions") def _not_a_proper_number(self, num): """ Returns: true if num is inf or NaN, false otherwise. """ return math.isnan(num) or math.isinf(num)
Methods
def aggregate(self, col, params: AggregateParams, data_extractors: DataExtractors)
-
Computes DP aggregation metrics.
Args
col
- collection with elements of the same type.
params
- specifies which metrics to compute and computation parameters.
data_extractors
- functions that extract needed pieces of information from elements of 'col'.
Expand source code
def aggregate(self, col, params: pipeline_dp.AggregateParams, data_extractors: DataExtractors): """Computes DP aggregation metrics. Args: col: collection with elements of the same type. params: specifies which metrics to compute and computation parameters. data_extractors: functions that extract needed pieces of information from elements of 'col'. """ self._check_aggregate_params(col, params, data_extractors) with self._budget_accountant.scope(weight=params.budget_weight): return self._aggregate(col, params, data_extractors)
def select_private_partitions(self, col, params: SelectPrivatePartitionsParams, data_extractors: DataExtractors)
-
Retrieves a collection of differentially-private partitions.
Args
col
- collection with elements of the same type.
params
- parameters, see doc for SelectPrivatePartitionsParams.
data_extractors
- functions that extract needed pieces of information from elements of 'col'. Only privacy_id_extractor and partition_extractor are required. value_extractor is not required.
Expand source code
def select_private_partitions( self, col, params: pipeline_dp.SelectPrivatePartitionsParams, data_extractors: DataExtractors): """Retrieves a collection of differentially-private partitions. Args: col: collection with elements of the same type. params: parameters, see doc for SelectPrivatePartitionsParams. data_extractors: functions that extract needed pieces of information from elements of 'col'. Only privacy_id_extractor and partition_extractor are required. value_extractor is not required. """ self._check_select_private_partitions(col, params, data_extractors) self._report_generators.append(report_generator.ReportGenerator(params)) max_partitions_contributed = params.max_partitions_contributed # Extract the columns. col = self._backend.map( col, lambda row: (data_extractors.privacy_id_extractor(row), data_extractors.partition_extractor(row)), "Extract (privacy_id, partition_key))") # col : (privacy_id, partition_key) # Apply cross-partition contribution bounding col = self._backend.group_by_key(col, "Group by privacy_id") # col : (privacy_id, [partition_key]) # Note: This may not be scalable if a single privacy ID contributes # to _way_ too many partitions. def sample_unique_elements_fn(pid_and_pks): pid, pks = pid_and_pks unique_pks = list(set(pks)) if len(unique_pks) <= max_partitions_contributed: sampled_elements = unique_pks else: # np.random.choice makes casting of elements to numpy types # which is undesirable by 2 reasons: # 1. Apache Beam can not serialize numpy types. # 2. It might lead for losing precision (e.g. arbitrary # precision int is converted to int64). # So np.random.choice should not be applied directly to # 'unique_pks'. It is better to apply it to indices. sampled_indices = np.random.choice(np.arange(len(unique_pks)), max_partitions_contributed, replace=False) sampled_elements = [unique_pks[i] for i in sampled_indices] return ((pid, pk) for pk in sampled_elements) col = self._backend.flat_map(col, sample_unique_elements_fn, "Sample cross-partition contributions") # col : (privacy_id, partition_key) # A compound accumulator without any child accumulators is used to calculate the raw privacy ID count. compound_combiner = combiners.CompoundCombiner([]) col = self._backend.map_tuple( col, lambda pid, pk: (pk, compound_combiner.create_accumulator([])), "Drop privacy id and add accumulator") # col : (partition_key, accumulator) col = self._backend.combine_accumulators_per_key( col, compound_combiner, "Combine accumulators per partition key") # col : (partition_key, accumulator) col = self._select_private_partitions_internal( col, max_partitions_contributed) col = self._backend.keys(col, "Drop accumulators, keep only partition keys") return col
class DataExtractors (privacy_id_extractor: Callable = None, partition_extractor: Callable = None, value_extractor: Callable = None)
-
Data extractors.
A set of functions that, given an input, return the privacy id, partition key, and value.
Expand source code
@dataclasses.dataclass class DataExtractors: """Data extractors. A set of functions that, given an input, return the privacy id, partition key, and value. """ privacy_id_extractor: Callable = None partition_extractor: Callable = None value_extractor: Callable = None
Class variables
var partition_extractor : Callable
var privacy_id_extractor : Callable
var value_extractor : Callable