133 lines
4.1 KiB
Python
133 lines
4.1 KiB
Python
import random
|
|
|
|
# CPTs
|
|
# P(C)
|
|
P_C = {True: 0.5, False: 0.5}
|
|
|
|
# P(S | C)
|
|
P_S_given_C = {
|
|
True: {True: 0.10, False: 0.90}, # C=T
|
|
False: {True: 0.50, False: 0.50} # C=F
|
|
}
|
|
|
|
# P(R | C)
|
|
P_R_given_C = {
|
|
True: {True: 0.80, False: 0.20}, # C=T
|
|
False: {True: 0.20, False: 0.80} # C=F
|
|
}
|
|
|
|
# P(W | S, R)
|
|
P_W_given_SR = {
|
|
(True, True): {True: 0.99, False: 0.01},
|
|
(True, False): {True: 0.90, False: 0.10},
|
|
(False, True): {True: 0.90, False: 0.10},
|
|
(False, False): {True: 0.00, False: 1.00}
|
|
}
|
|
|
|
def sample_variable(prob_true):
|
|
"""Returns True with probability prob_true, else False"""
|
|
return random.random() < prob_true
|
|
|
|
def get_prior_sample():
|
|
"""Generates a sample from the joint distribution P(C, S, R, W)"""
|
|
c = sample_variable(P_C[True])
|
|
s = sample_variable(P_S_given_C[c][True])
|
|
r = sample_variable(P_R_given_C[c][True])
|
|
w = sample_variable(P_W_given_SR[(s, r)][True])
|
|
return {'C': c, 'S': s, 'R': r, 'W': w}
|
|
|
|
def rejection_sampling(query_var, query_val, evidence, num_samples):
|
|
"""
|
|
Estimates P(query_var = query_val | evidence) using Rejection Sampling.
|
|
evidence is a dict, e.g., {'S': True, 'W': True}
|
|
"""
|
|
consistent_count = 0
|
|
query_match_count = 0
|
|
|
|
for _ in range(num_samples):
|
|
sample = get_prior_sample()
|
|
|
|
# Check if sample is consistent with evidence
|
|
is_consistent = True
|
|
for var, val in evidence.items():
|
|
if sample[var] != val:
|
|
is_consistent = False
|
|
break
|
|
|
|
if is_consistent:
|
|
consistent_count += 1
|
|
if sample[query_var] == query_val:
|
|
query_match_count += 1
|
|
|
|
if consistent_count == 0:
|
|
return 0.0 # Avoid division by zero
|
|
|
|
return query_match_count / consistent_count
|
|
|
|
def likelihood_weighting(query_var, query_val, evidence, num_samples):
|
|
"""
|
|
Estimates P(query_var = query_val | evidence) using Likelihood Weighting.
|
|
evidence is a dict, e.g., {'S': True, 'W': True}
|
|
"""
|
|
weighted_counts = {True: 0.0, False: 0.0}
|
|
|
|
for _ in range(num_samples):
|
|
weight = 1.0
|
|
sample = {}
|
|
|
|
# C
|
|
if 'C' in evidence:
|
|
sample['C'] = evidence['C']
|
|
weight *= P_C[sample['C']]
|
|
else:
|
|
sample['C'] = sample_variable(P_C[True])
|
|
|
|
# S
|
|
if 'S' in evidence:
|
|
sample['S'] = evidence['S']
|
|
weight *= P_S_given_C[sample['C']][sample['S']]
|
|
else:
|
|
sample['S'] = sample_variable(P_S_given_C[sample['C']][True])
|
|
|
|
# R
|
|
if 'R' in evidence:
|
|
sample['R'] = evidence['R']
|
|
weight *= P_R_given_C[sample['C']][sample['R']]
|
|
else:
|
|
sample['R'] = sample_variable(P_R_given_C[sample['C']][True])
|
|
|
|
# W
|
|
if 'W' in evidence:
|
|
sample['W'] = evidence['W']
|
|
weight *= P_W_given_SR[(sample['S'], sample['R'])][sample['W']]
|
|
else:
|
|
sample['W'] = sample_variable(P_W_given_SR[(sample['S'], sample['R'])][True])
|
|
|
|
# Accumulate weight for the value of the query variable in this sample
|
|
weighted_counts[sample[query_var]] += weight
|
|
|
|
total_weight = sum(weighted_counts.values())
|
|
if total_weight == 0:
|
|
return 0.0
|
|
|
|
return weighted_counts[query_val] / total_weight
|
|
|
|
if __name__ == "__main__":
|
|
evidence = {'S': True, 'W': True}
|
|
query_var = 'R'
|
|
query_val = True
|
|
|
|
# Exact Calculation (calculated manually in thought process)
|
|
# Numerator: 0.0891
|
|
# Denominator: 0.2781
|
|
exact_prob = 0.0891 / 0.2781
|
|
print(f"Exact Probability P(R=T | S=T, W=T): {exact_prob:.6f}")
|
|
print("-" * 60)
|
|
print(f"{'Samples':<10} | {'Rejection Sampling':<20} | {'Likelihood Weighting':<20}")
|
|
print("-" * 60)
|
|
|
|
for N in [100, 1000, 10000, 100000, 1000000]:
|
|
rs_result = rejection_sampling(query_var, query_val, evidence, N)
|
|
lw_result = likelihood_weighting(query_var, query_val, evidence, N)
|
|
print(f"{N:<10} | {rs_result:<20.6f} | {lw_result:<20.6f}")
|