Simulated Annealing
Simulated Annealing(模拟退火) 是一种基于自然物理现象的全局优化算法。它的灵感来自冶金学中的退火过程,即将材料加热到高温后逐渐冷却,以减少其内部缺陷,从而获得更稳定的结构。
在算法中,它通过在解空间中随机搜索,并结合一定的“接受次优解”的机制,逐步趋近问题的全局最优解。
核心原理:
初始解与初始温度:
- 选定一个初始解,设定一个初始温度(较高值)。
邻域搜索:
- 在当前解的邻域中随机生成一个新解。
评价与接受准则:
- 如果新解的目标函数值更优,则接受新解。
- 如果新解较差,则按一定的概率接受它,概率通常由公式控制: [ P = e^{-\Delta E / T} ]
- ( \Delta E ) 是目标函数值的差,( T ) 是当前温度。
- 随着温度降低,接受差解的概率减少。
降温过程:
- 按一定规则逐步降低温度(通常称为退火计划)。
终止条件:
- 当温度降到某一阈值或达到最大迭代次数时,停止算法。
模拟退火的特点:
优点:
- 能够跳出局部最优解,找到全局最优解或近似最优解。
- 对初始条件不敏感。
缺点:
- 收敛速度较慢。
- 参数(初始温度、降温规则等)的选择影响优化效果。
常见应用:
- 组合优化问题:
- 旅行商问题(TSP)
- 排列与调度问题
- 函数优化:
- 处理多峰函数的全局最优解搜索。
- 机器学习:
- 超参数优化。
示例
下面是一个使用 Python 实现 Simulated Annealing(模拟退火) 的简单示例。我们用一个常见的优化问题来演示:找到一个函数的全局最小值。
示例问题:
优化函数 ( f(x) = x^2 + 4\sin(5x) + 3\cos(3x) ) 的最小值。
实现代码:
python
import math
import random
import matplotlib.pyplot as plt
# 目标函数
def objective_function(x):
return x**2 + 4 * math.sin(5 * x) + 3 * math.cos(3 * x)
# 模拟退火算法
def simulated_annealing(objective, bounds, initial_temp, cooling_rate, max_iter):
# 初始化
current_x = random.uniform(bounds[0], bounds[1]) # 随机初始解
current_value = objective(current_x)
best_x = current_x
best_value = current_value
temperature = initial_temp
# 记录优化过程
history = [(current_x, current_value)]
for iteration in range(max_iter):
# 在当前解的邻域中随机生成新解
candidate_x = current_x + random.uniform(-1, 1)
# 确保新解在边界范围内
candidate_x = max(bounds[0], min(candidate_x, bounds[1]))
candidate_value = objective(candidate_x)
# 计算能量差
delta_value = candidate_value - current_value
# 决定是否接受新解
if delta_value < 0 or random.random() < math.exp(-delta_value / temperature):
current_x = candidate_x
current_value = candidate_value
# 更新最优解
if current_value < best_value:
best_x = current_x
best_value = current_value
# 降温
temperature *= cooling_rate
# 保存当前状态
history.append((current_x, current_value))
# 停止条件
if temperature < 1e-10:
break
return best_x, best_value, history
# 参数设置
bounds = [-2, 2] # 搜索范围
initial_temp = 100 # 初始温度
cooling_rate = 0.95 # 降温速率
max_iter = 1000 # 最大迭代次数
# 运行算法
best_x, best_value, history = simulated_annealing(
objective_function, bounds, initial_temp, cooling_rate, max_iter
)
# 打印结果
print(f"最佳解: x = {best_x:.4f}, f(x) = {best_value:.4f}")
# 可视化优化过程
x_values = [x for x, _ in history]
y_values = [y for _, y in history]
plt.plot(range(len(history)), y_values, label="Objective Value")
plt.xlabel("Iteration")
plt.ylabel("Objective Value")
plt.title("Simulated Annealing Optimization Process")
plt.legend()
plt.show()
代码解析:
- 目标函数:
- ( f(x) = x^2 + 4\sin(5x) + 3\cos(3x) ) 是多峰函数,用于测试优化算法的性能。
- 邻域搜索:
- 在当前解的基础上加一个随机扰动,控制范围在 ([-1, 1]) 内。
- 接受准则:
- 优于当前解直接接受;否则按一定概率接受次优解,概率由退火公式控制。
- 降温策略:
- 每次迭代温度乘以 ( cooling_rate )(如 0.95)。
实例 kaggle: Santa Claude 使用 simulated annealing 求解 kaggle 2024 Santa 问题
- 概述: 将杂乱无章的文字段落拼接成一个困惑度最低的语句
下面的代码摘录来自 kaggle 可能需要 kaggle 环境
- 安装依赖
python
print("notebook started..")
!pip install -q -U transformers --no-index --find-links /kaggle/input/hf-libraries/transformers
!pip install -q -U accelerate --no-index --find-links /kaggle/input/hf-libraries/accelerate
!pip install -q -U bitsandbytes --no-index --find-links /kaggle/input/hf-libraries/bitsandbytes
print("pip installs done!")
import numpy as np
import pandas as pd
import random
import math
- 加载评分器(困惑度计算模型)
python
from metric import PerplexityCalculator
#gemma-2-9b (competition scoring metric)
scorer = PerplexityCalculator('/kaggle/input/gemma-2/transformers/gemma-2-9b/2')
- 加载数据(保存上次运行结果)
python
# Set to false to do a "clean" run
reuse_last_submission = True
if reuse_last_submission:
samples = pd.read_csv("/kaggle/input/santa-claude-output/submission.csv")
else:
samples = pd.read_csv("/kaggle/input/santa-2024/sample_submission.csv")
- 初始化评分
python
# Get actual mean value
scores = []
for row in range(len(samples)):
score = scorer.get_perplexity(samples.iloc[row].text)
print(samples.iloc[row].text)
print(f"Score: {score:.2f}\n")
scores.append(score)
print(f"Starting mean score: {np.mean(scores):.2f}")
- 开始 simulated annealing
python
# ">" new version with score improved
# "<" new version with poorer score
# "-" maintaining existing version
temp_start = 10.0 #how high a temperature we start with (prior 10)
temp_end = 0.5 #final temperature (prior 0.2)
cooling_rate = 0.95 #how quick we cool each time we drop temp (prior 0.95)
steps_per_temp = 5 #steps at each temperature (prior 20) <---- Increase this for a longer run (20 steps is about 3 hours)
def simulated_annealing_optimize(text: str, temp_start=temp_start, temp_end=temp_end, cooling_rate=cooling_rate, steps_per_temp=steps_per_temp, verbose=False):
"""Optimize word sequence using simulated annealing, handling NaN scores by randomizing.
Args:
text: Input string of space-separated words to optimize
temp_start: Starting temperature - higher means more random exploration
temp_end: Ending temperature - lower means more selective at end
cooling_rate: How fast temperature decreases each step
steps_per_temp: How many swaps to try at each temperature
verbose: Whether to print detailed progress
"""
words = text.split()
current = words.copy()
current_score = scorer.get_perplexity(' '.join(current))
# Handling any NaNs...
if math.isnan(current_score):
# Keep shuffling until we find a valid sequence
while True:
current = words.copy()
random.shuffle(current)
current_score = scorer.get_perplexity(' '.join(current))
if not math.isnan(current_score):
break
best = current.copy()
best_score = current_score
temp = temp_start
print(f"Start Temperature: {temp:.2f}, Initial score: {current_score:.2f}")
# Main annealing loop - keep trying until we've cooled down enough
while temp > temp_end:
for _ in range(steps_per_temp): # Do multiple attempts at each temperature
# Try improving sequence by swapping random pairs of words
i, j = random.sample(range(len(words)), 2)
neighbor = current.copy()
neighbor[i], neighbor[j] = neighbor[j], neighbor[i]
# Get score for this arrangement, skip if invalid
neighbor_score = scorer.get_perplexity(' '.join(neighbor))
if math.isnan(neighbor_score):
continue
# Accept better scores, sometimes accept worse ones based on temperature
delta = neighbor_score - current_score
if delta < 0 or random.random() < math.exp(-delta / temp):
current = neighbor
current_score = neighbor_score
if current_score < best_score:
best = current.copy()
best_score = current_score
print(">", end="")
else: print("<", end="")
else:print("-", end="")
# Reduce temperature according to cooling schedule (AFTER all steps at this temperature)
temp *= cooling_rate
if verbose: print(f"\nTemperature: {temp:.2f}, Current score: {current_score:.2f}")
print(f"\nFinal score: {best_score:.2f}")
return ' '.join(best), best_score
#uncomment for test run
#text = samples.iloc[4].text
#simulated_annealing_optimize(text, verbose=True)
- 实际计算 scores
python
submission = pd.DataFrame(columns=['id', 'text'])
scores = []
# Process each sample
for idx, row in samples.iterrows():
print(f"\nProcessing sample {idx}...")
optimized, score = simulated_annealing_optimize(row.text)
scores.append(score)
# Add to submission dataframe
submission.loc[idx] = {
'id': row.id,
'text': optimized
}
print("-" * 50)
# Print summary statistics
print("\nScore Summary:")
print(f"Submission mean score: {np.mean(scores):.2f}")
# Save to CSV
submission.to_csv("submission.csv", index=False)
print("\nSubmission file created!")
submission
代码 chatgpt 解读
这个代码实现了一个基于模拟退火(Simulated Annealing)的优化算法,目标是优化输入文本中单词的排列顺序,使得排列后的句子得分(由 scorer.get_perplexity()
函数提供)最优。
代码解析
1. 参数与配置
temp_start
:初始温度,值越高,越倾向于随机探索。temp_end
:最终温度,达到该值时算法停止,值越低,搜索更精确。cooling_rate
:每次降温时的比例(0.95表示每轮温度减小5%)。steps_per_temp
:每个温度下尝试的解的数量。值越高,优化过程越长,但可能找到更好的解。
2. 函数参数
text
:输入字符串,包含以空格分隔的单词。verbose
:是否打印详细的中间过程。
3. 函数主要逻辑
Step 1: 数据准备
python
words = text.split()
current = words.copy()
current_score = scorer.get_perplexity(' '.join(current))
- 输入字符串被分割成单词列表。
current
是当前的单词排列。current_score
是当前排列的得分,使用scorer.get_perplexity()
函数计算。
Step 2: 处理 NaN
python
if math.isnan(current_score):
# Shuffle until a valid score is found
while True:
current = words.copy()
random.shuffle(current)
current_score = scorer.get_perplexity(' '.join(current))
if not math.isnan(current_score):
break
- 如果当前排列的得分为
NaN
(无效),就随机打乱单词顺序,直到找到一个有效得分。
Step 3: 初始化最优解
python
best = current.copy()
best_score = current_score
temp = temp_start
- 初始化当前解为最佳解
best
。 - 记录当前最佳得分
best_score
。 - 设置初始温度。
Step 4: 模拟退火主循环
python
while temp > temp_end:
for _ in range(steps_per_temp): # 多次尝试改进解
i, j = random.sample(range(len(words)), 2) # 随机选两个单词
neighbor = current.copy()
neighbor[i], neighbor[j] = neighbor[j], neighbor[i] # 交换两个单词
- 当温度高于
temp_end
时继续循环。 - 每个温度下尝试多次邻域搜索(根据
steps_per_temp
)。 - 随机选取两个单词交换位置,生成一个“邻域解”
neighbor
。
Step 5: 评估新解
python
neighbor_score = scorer.get_perplexity(' '.join(neighbor))
if math.isnan(neighbor_score):
continue
- 计算新解的得分
neighbor_score
。 - 如果得分为
NaN
,跳过此次尝试。
Step 6: 接受准则
python
delta = neighbor_score - current_score
if delta < 0 or random.random() < math.exp(-delta / temp):
current = neighbor
current_score = neighbor_score
if current_score < best_score:
best = current.copy()
best_score = current_score
print(">", end="")
else: print("<", end="")
else: print("-", end="")
- 优于当前解(
delta < 0
)直接接受。 - 次优解 有一定概率接受,概率由温度和能量差决定: [ P = e^{-\Delta / T} ]
- 随着温度下降,接受次优解的概率降低。
- 如果新解优于历史最优解,更新最优解。
Step 7: 降温
python
temp *= cooling_rate
- 按照降温速率减少当前温度。
- 当温度低于
temp_end
时停止。
Step 8: 输出结果
python
print(f"\nFinal score: {best_score:.2f}")
return ' '.join(best), best_score
- 返回最佳单词排列和对应得分。
细节说明
“>”, “<”, “-”的输出:
>
:表示新解得分优于当前最优解。<
:表示新解被接受,但得分不如当前最优解。-
:表示新解未被接受。
随机性与全局优化:
- 初始高温使算法能探索更多随机排列,避免陷入局部最优。
- 降温使算法逐渐集中在更优的解附近。
优化目标:
- 目标是通过调整单词顺序,使得句子得分(由
scorer.get_perplexity
计算)最低。
- 目标是通过调整单词顺序,使得句子得分(由
性能调节:
- 增大
steps_per_temp
和减小cooling_rate
会提高解的质量,但运行时间增加。
- 增大
改进建议
- 并行优化:
- 可以通过多线程或多进程同时运行多个模拟退火实例,提高找到全局最优解的概率。
- 动态降温策略:
- 根据得分变化动态调整降温速率,使优化更灵活。
- 更丰富的邻域操作:
- 除交换两个单词,还可尝试更多复杂的邻域变化(如逆序一段)。