#!/usr/bin/env python3
#
# Copyright (c) 2022-2023 Jan Behrens
#
# Permission is hereby granted, free of charge, to any person obtaining a
# copy of this software and associated documentation files (the "Software"),
# to deal in the Software without restriction, including without limitation
# the rights to use, copy, modify, merge, publish, distribute, sublicense,
# and/or sell copies of the Software, and to permit persons to whom the
# Software is furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
# DEALINGS IN THE SOFTWARE.
"""
Randomly choose items while respecting categorization
The algorithm in this module can be used, for example, to choose items
from a pool of questions for an exam questionaire.
There are two functions: single and batch. The first can be used to create
a single sample of a given size, the second will create several samples at
once with additional constraints regarding the whole batch.
"""
import math
import random
__version__ = "1.1.5"
def _distribute(targets, lows, highs, high_prob):
"""
Helper function to enforce rounding to not go beyond nearest integers
targets:
dict mapping category to ideal target count
lows:
dict mapping category to lowered target counts (e.g. due to
rounding of parent category), which will be modified by this
function to not exceed nearest integers of target counts
highs:
dict mapping category to raised target counts (e.g. due to
rounding of parent category), which will be modified by this
function to not exceed nearest integers of target counts
high_prob:
probability of using raised target counts
"""
# calculate boundaries based on targets
low_bounds, high_bounds = {}, {}
for cat, target in targets.items():
low_bounds[cat] = math.floor(target)
high_bounds[cat] = math.ceil(target)
# categories which can still be adjusted
open_cats = set(targets)
# iteratively raise lows[cat] and lower highs[cat] as long as they
# exceed low_bounds[cat] or high_bounds[cat], respectively,
# and distribute induced error
while True:
# errors (positive numbers) introduced in lows or highs
low_error, high_error = 0, 0
# limit counts in open categories where needed, mark those categories
# as no longer open, and remember induced errors
for cat in open_cats.copy():
error = low_bounds[cat] - lows[cat]
if error > 0:
lows[cat] = low_bounds[cat]
low_error += error
other_error = error * ((1-high_prob) / high_prob)
highs[cat] -= other_error
high_error += other_error
open_cats.remove(cat)
continue
error = highs[cat] - high_bounds[cat]
if error > 0:
highs[cat] = high_bounds[cat]
high_error += error
other_error = error * (high_prob / (1-high_prob))
lows[cat] += other_error
low_error += other_error
open_cats.remove(cat)
# stop when there is no more error to be distributed
if low_error == 0 and high_error == 0: break
# distribute error in remaining open categories
low_total, high_total = 0, 0
for cat in open_cats:
low_total += lows[cat]
high_total += highs[cat]
for cat in open_cats:
if low_total:
lows[cat] -= low_error * lows[cat] / low_total
if high_total:
highs[cat] += high_error * highs[cat] / high_total
def _discretize(parent, parent_count, round_up, in_counts, out_counts):
"""
Recursive helper function discretizing hierarchical counts
parent:
parent category as tuple
parent_count:
desired count for parent category, may be non-integer
round_up:
boolean determining if parent_count is to be rounded up
(or down if false)
in_counts:
dict mapping leaf-categories to relative (ideal) counts
out_counts:
dict, which will be modified by this function to contain resulting
counts for each category
(can be sparse, i.e. zeroes are not written)
"""
# perform both possible roundings of parent_count
parent_count_floor = math.floor(parent_count)
parent_count_ceil = math.ceil(parent_count)
# perform rounding of parent_count according to round_up flag
if round_up:
parent_count_rounded = parent_count_ceil
else:
parent_count_rounded = parent_count_floor
# hypothetical rounding probability disregarding round_up flag
parent_count_frac = parent_count % 1
# stop when no more items are left to be picked
if parent_count_rounded == 0: return
# output rounded count for current category
out_counts[parent] = parent_count_rounded
# determine hierarchy level
level = len(parent)
# determine childen and their counts as well as the total count
children = {}
total = 0
for cat, count in in_counts.items():
if len(cat) > level and cat[0:level] == parent:
child = cat[0:level+1]
children[child] = children.get(child, 0) + count
total += count
# stop when there are no child categories
if total == 0: return
# counts to pick per category
# (real target, and counts in case of rounding parent_count down or up)
targets, lows, highs = {}, {}, {}
for cat, count in children.items():
target = parent_count * count / total
targets[cat] = target
lows[cat] = target * parent_count_floor / parent_count
highs[cat] = target * parent_count_ceil / parent_count
# limit counts (lows and highs) to nearest integer
_distribute(targets, lows, highs, parent_count_frac)
# use target counts for categories
# based on demanded rounding of parent_count
if round_up:
targets = highs
else:
targets = lows
# round target counts while ensuring that sum equals parent_count_rounded
rounded_counts = {}
done_counter = 0
remaining_targets = dict(targets)
while done_counter < parent_count_rounded:
cat, target = remaining_targets.popitem()
if not remaining_targets:
rounded_counts[cat] = parent_count_rounded - done_counter
break
round_up_error = 1 - (target % 1)
round_down_error = target % 1
remaining_total = 0
for remaining_cat, remaining_target in remaining_targets.items():
remaining_total += remaining_target
lows, highs = {}, {}
for remaining_cat, remaining_target in remaining_targets.items():
if remaining_total > 0:
lows[remaining_cat] = (
remaining_target -
round_up_error * remaining_target / remaining_total
)
highs[remaining_cat] = (
remaining_target +
round_down_error * remaining_target / remaining_total
)
else:
lows[remaining_cat] = math.floor(remaining_target + 0.5)
highs[remaining_cat] = math.floor(remaining_target + 0.5)
# using _distribute function in a different context here
_distribute(remaining_targets, lows, highs, round_up_error)
if round_up_error > random.random():
remaining_targets = highs
rounded_count = math.floor(target)
else:
remaining_targets = lows
rounded_count = math.ceil(target)
rounded_counts[cat] = rounded_count
done_counter += rounded_count
for cat in remaining_targets:
rounded_counts[cat] = 0
# process children
for cat in children:
_discretize(
cat,
targets[cat],
rounded_counts[cat] > targets[cat],
in_counts,
out_counts
)
def _batch(
parent,
parent_count,
single_counts,
batch_counts,
batch_remaining,
out_counts
):
"""
Recursive helper function determining counts for a sample in a batch
parent:
parent category as tuple
parent_count:
desired count for parent category, must be integer
single_counts:
dict mapping leaf-categories to relative (ideal) counts
batch_counts:
sparse dict mapping categories to remaining counts for batch;
will be modified by this function
batch_remaining:
non-zero integer indicating how often this function is called to
finish the batch (i.e. will be 1 on last call)
out_counts:
dict, which will be modified by this function to contain resulting
counts for each category
(can be sparse, i.e. zeroes are not written)
"""
# move count for parent category from batch_counts to out_counts
if parent in batch_counts:
batch_counts[parent] -= parent_count
out_counts[parent] = parent_count
# determine ideal counts for children
targets = {}
level = len(parent)
for cat in single_counts:
if len(cat) > level and cat[0:level] == parent:
child = cat[0:level+1]
targets[child] = (
targets.get(child, 0) + single_counts[cat]
)
# stop when there are no children
if not targets: return
# determine ideal counts in regard to remaining batch
batch_targets = {}
for cat in targets:
batch_targets[cat] = batch_counts.get(cat, 0) / batch_remaining
# round counts for children based on ideal counts for remaining batch
rounded_targets = {}
for cat, target in targets.items():
target_floor = math.floor(target)
target_ceil = math.ceil(target)
target_middle = (target_floor + target_ceil) / 2
if target_middle < batch_targets[cat]:
rounded_targets[cat] = math.ceil(target)
else:
rounded_targets[cat] = math.floor(target)
# check if sum of children count is correct and adjust rounding if it's not
total = 0
for cat, target in rounded_targets.items():
total += target
while total < parent_count:
best_error = None
candidates = {}
for cat, target in rounded_targets.items():
# check if category count can be rounded up
if targets[cat] > target:
error = (target + 1) - batch_targets[cat]
if best_error is None or best_error > error:
best_error = error
candidates[cat] = error
best_candidates = []
for candidate, error in candidates.items():
if error == best_error:
best_candidates.append(candidate)
rounded_targets[random.choice(best_candidates)] += 1
total += 1
while total > parent_count:
best_error = None
candidates = {}
for cat, target in rounded_targets.items():
# check if category count can be rounded down
if targets[cat] < target:
error = batch_targets[cat] - (target - 1)
if best_error is None or best_error > error:
best_error = error
candidates[cat] = error
best_candidates = []
for candidate, error in candidates.items():
if error == best_error:
best_candidates.append(candidate)
rounded_targets[random.choice(best_candidates)] -= 1
total -= 1
# process children
for cat, target in rounded_targets.items():
_batch(
cat,
target,
single_counts,
batch_counts,
batch_remaining,
out_counts
)
def single(category_mapper, pool, sample_size):
"""
Create sample containing some items from pool, respecting categories
Returns a list of items from the pool. Length of the returned list
will be equal to given sample_size.
Items are categorized by the category_mapper, which retrieves items of
the pool as single argument and has to return a sequence (e.g.
[1, 7, 4] for an item that belongs to top category 1, subcategory 7,
and sub-subcategory 4).
Each category will be represented proportionally (randomly rounded up
or down to the nearest integer value) while the probability of each
item to appear is the same, independently of how many items in the
pool belong to the respective category.
If sample_size is smaller than or equal to pool length, no items from
the pool will be selected more than once.
category_mapper: function mapping an item to a category (sequence)
sample_size: number of items desired in returned sample
"""
return batch(category_mapper, pool, sample_size, 1)[0]
def batch(category_mapper, pool, sample_size, sample_count):
"""
Create several samples from a pool of items, respecting categories
Returns a list of samples (of length sample_count), where each element
(sample) is again a list. Each sample has a length equal to the
sample_size argument and consists of items from the pool.
Items are categorized by the category_mapper, which retrieves items of
the pool as single argument and has to return a sequence (e.g.
[1, 7, 4] for an item that belongs to top category 1, subcategory 7,
and sub-subcategory 4).
Both each sample and the batch of samples will represent each category
proportionally (randomly rounded up or down to the nearest integer
value) while the probability of each item to appear in a sample is the
same, independently of how many items in the pool belong to the
respective category.
If sample_size times sample_count is equal to or greater than the pool
length, each item of the pool will be selected for at least one
sample.
category_mapper: function mapping an item to a category (sequence)
sample_size: number of items per sample
sample_count: number of samples
"""
# determine relative counts of leaf-categories and items of each
# leaf-category and initialize temporary item index list for each category
in_counts = {}
subpools = {}
subpools_idxs = {}
for item in pool:
cat = tuple(category_mapper(item))
in_counts[cat] = in_counts.get(cat, 0) + 1
if cat in subpools:
subpools[cat].append(item)
else:
subpools[cat] = [item]
subpools_idxs[cat] = []
# determine ideal target counts for each sample in batch
single_counts = {}
for cat, cat_count in in_counts.items():
single_counts[cat] = cat_count * sample_size / len(pool)
# create discrete counts for whole batch
total_count = sample_size * sample_count
batch_counts = {}
_discretize(
(),
total_count,
total_count % 1 > random.random(),
in_counts,
batch_counts,
)
# generate samples for batch
results = []
for i in range(sample_count):
# generate category counts for sample
out_counts = {}
_batch(
(),
math.floor(sample_size + random.random()),
single_counts,
batch_counts,
sample_count - i,
out_counts
)
# pick required number of items for each category from pool while
# prohibiting duplicates in a sample
result = []
for cat, subpool in subpools.items():
subpool_idxs = subpools_idxs[cat]
used_idxs = set()
reshuffle = False
for _ in range(out_counts.get(cat, 0)):
if not subpool_idxs:
subpool_idxs = list(range(len(subpool)))
random.shuffle(subpool_idxs)
subpools_idxs[cat] = subpool_idxs
while True:
idx = subpool_idxs.pop()
if idx in used_idxs:
subpool_idxs.insert(0, idx)
reshuffle = True
else:
break
used_idxs.add(idx)
result.append(subpool[idx])
if reshuffle:
random.shuffle(subpool_idxs)
# shuffle items in sample
random.shuffle(result)
# append sample list to resulting list of samples
results.append(result)
# shuffle samples and return them
random.shuffle(results)
return results
def _test():
"""Module self-test"""
def rounding_variance(target):
frac = target % 1
compl = 1 - frac
return frac*frac*compl + compl*compl*frac
def assert_eq(left, right):
if not abs(left - right) < 1e-12:
raise AssertionError("testing assertion failed")
def assert_range(value, low, high):
if not low <= value <= high:
raise AssertionError("testing assertion failed")
def assert_average(rounds, target, result):
average = rounds * target
stddev = math.sqrt(rounds * rounding_variance(target))
allowed = 6 * stddev
assert_range(result, average - allowed, average + allowed)
print("Running duplicate test...")
pool = list(range(20))
samples = batch(lambda x: [None], pool, 15, 100)
for sample in samples:
used = set()
for item in sample:
used.add(item)
assert_eq(len(sample), len(used))
print("Running coverage test...")
pool = list(range(100))
samples = batch(lambda x: [None], pool, 10, 10)
covered = set()
for sample in samples:
for item in sample:
covered.add(item)
assert_eq(len(pool), len(covered))
def category_mapper(item):
result = []
for char in item:
if char == "-": break
result.append(char)
return result
for i in range(1, 1+10):
batch_operation = i % 2 == 0
if not batch_operation:
print(
"Running statistical test #" + str(i) +
" of 10 (independent)..."
)
else:
sample_count = random.randint(2, 30)
print(
"Running statistical test #" + str(i) + " of 10 (with " +
str(sample_count) + " samples per batch each)..."
)
target_a1 = random.randint(1, 30)
target_a2 = random.randint(1, 30)
target_a3 = random.randint(1, 30)
target_b1 = random.randint(1, 12)
target_b2 = random.randint(1, 12)
target_b3 = random.randint(1, 12)
target_c1 = random.randint(1, 6)
target_c2 = random.randint(1, 6)
target_c3 = random.randint(1, 6)
target_a = target_a1 + target_a2 + target_a3
target_b = target_b1 + target_b2 + target_b3
target_c = target_c1 + target_c2 + target_c3
count = target_a + target_b + target_c
pool = []
for seq in range(len(pool)+1, len(pool)+1+target_a1):
pool.append("A1-" + str(seq))
for seq in range(len(pool)+1, len(pool)+1+target_a2):
pool.append("A2-" + str(seq))
for seq in range(len(pool)+1, len(pool)+1+target_a3):
pool.append("A3-" + str(seq))
for seq in range(len(pool)+1, len(pool)+1+target_b1):
pool.append("B1-" + str(seq))
for seq in range(len(pool)+1, len(pool)+1+target_b2):
pool.append("B2-" + str(seq))
for seq in range(len(pool)+1, len(pool)+1+target_b3):
pool.append("B3-" + str(seq))
for seq in range(len(pool)+1, len(pool)+1+target_c1):
pool.append("C1-" + str(seq))
for seq in range(len(pool)+1, len(pool)+1+target_c2):
pool.append("C2-" + str(seq))
for seq in range(len(pool)+1, len(pool)+1+target_c3):
pool.append("C3-" + str(seq))
if i % 2 == 1:
random.shuffle(pool)
sample_size = random.randint(1, random.randint(1, count))
target_a1 *= sample_size / count
target_a2 *= sample_size / count
target_a3 *= sample_size / count
target_b1 *= sample_size / count
target_b2 *= sample_size / count
target_b3 *= sample_size / count
target_c1 *= sample_size / count
target_c2 *= sample_size / count
target_c3 *= sample_size / count
target_a *= sample_size / count
target_b *= sample_size / count
target_c *= sample_size / count
target_a1_floor = math.floor(target_a1)
target_a1_ceil = math.ceil(target_a1)
target_a2_floor = math.floor(target_a2)
target_a2_ceil = math.ceil(target_a2)
target_a3_floor = math.floor(target_a3)
target_a3_ceil = math.ceil(target_a3)
target_b1_floor = math.floor(target_b1)
target_b1_ceil = math.ceil(target_b1)
target_b2_floor = math.floor(target_b2)
target_b2_ceil = math.ceil(target_b2)
target_b3_floor = math.floor(target_b3)
target_b3_ceil = math.ceil(target_b3)
target_c1_floor = math.floor(target_c1)
target_c1_ceil = math.ceil(target_c1)
target_c2_floor = math.floor(target_c2)
target_c2_ceil = math.ceil(target_c2)
target_c3_floor = math.floor(target_c3)
target_c3_ceil = math.ceil(target_c3)
target_a_floor = math.floor(target_a)
target_a_ceil = math.ceil(target_a)
target_b_floor = math.floor(target_b)
target_b_ceil = math.ceil(target_b)
target_c_floor = math.floor(target_c)
target_c_ceil = math.ceil(target_c)
rounds = 30000
sum_a1, sum_a2, sum_a3 = 0, 0, 0
sum_b1, sum_b2, sum_b3 = 0, 0, 0
sum_c1, sum_c2, sum_c3 = 0, 0, 0
sum_a, sum_b, sum_c = 0, 0, 0
samples = []
for _ in range(rounds):
if batch_operation:
if not samples:
samples = batch(
category_mapper,
pool,
sample_size,
sample_count
)
sample = samples.pop()
else:
sample = single(category_mapper, pool, sample_size)
used = set()
for item in sample:
used.add(item)
assert_eq(len(sample), len(used))
count_a1 = len(list(filter(lambda x: x[0:2] == "A1", sample)))
count_a2 = len(list(filter(lambda x: x[0:2] == "A2", sample)))
count_a3 = len(list(filter(lambda x: x[0:2] == "A3", sample)))
count_b1 = len(list(filter(lambda x: x[0:2] == "B1", sample)))
count_b2 = len(list(filter(lambda x: x[0:2] == "B2", sample)))
count_b3 = len(list(filter(lambda x: x[0:2] == "B3", sample)))
count_c1 = len(list(filter(lambda x: x[0:2] == "C1", sample)))
count_c2 = len(list(filter(lambda x: x[0:2] == "C2", sample)))
count_c3 = len(list(filter(lambda x: x[0:2] == "C3", sample)))
count_a = count_a1 + count_a2 + count_a3
count_b = count_b1 + count_b2 + count_b3
count_c = count_c1 + count_c2 + count_c3
assert_eq(count_a + count_b + count_c, sample_size)
assert_range(count_a1, target_a1_floor, target_a1_ceil)
assert_range(count_a2, target_a2_floor, target_a2_ceil)
assert_range(count_a3, target_a3_floor, target_a3_ceil)
assert_range(count_b1, target_b1_floor, target_b1_ceil)
assert_range(count_b2, target_b2_floor, target_b2_ceil)
assert_range(count_b3, target_b3_floor, target_b3_ceil)
assert_range(count_c1, target_c1_floor, target_c1_ceil)
assert_range(count_c2, target_c2_floor, target_c2_ceil)
assert_range(count_c3, target_c3_floor, target_c3_ceil)
assert_range(count_a, target_a_floor, target_a_ceil)
assert_range(count_b, target_b_floor, target_b_ceil)
assert_range(count_c, target_c_floor, target_c_ceil)
sum_a1 += count_a1; sum_a2 += count_a2; sum_a3 += count_a3
sum_b1 += count_b1; sum_b2 += count_b2; sum_b3 += count_b3
sum_c1 += count_c1; sum_c2 += count_c2; sum_c3 += count_c3
sum_a = sum_a1 + sum_a2 + sum_a3
sum_b = sum_b1 + sum_b2 + sum_b3
sum_c = sum_c1 + sum_c2 + sum_c3
assert_average(rounds, target_a1, sum_a1)
assert_average(rounds, target_a2, sum_a2)
assert_average(rounds, target_a3, sum_a3)
assert_average(rounds, target_b1, sum_b1)
assert_average(rounds, target_b2, sum_b2)
assert_average(rounds, target_b3, sum_b3)
assert_average(rounds, target_c1, sum_c1)
assert_average(rounds, target_c2, sum_c2)
assert_average(rounds, target_c3, sum_c3)
assert_average(rounds, target_a, sum_a)
assert_average(rounds, target_b, sum_b)
assert_average(rounds, target_c, sum_c)
print("All tests passed.")
if __name__ == "__main__":
_test()