Coverage for src/sensai/local_search.py: 28%

384 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-08-13 22:17 +0000

1import collections 

2import logging 

3import math 

4import random 

5import time 

6from abc import ABC, abstractmethod 

7from typing import Optional, Tuple, Callable, Type, Sequence, TypeVar, Generic 

8 

9import numpy as np 

10import pandas as pd 

11from matplotlib import pyplot as plt 

12 

13from .util.aggregation import RelativeFrequencyCounter 

14 

15log = logging.getLogger(__name__) 

16 

17 

18class SATemperatureSchedule(ABC): 

19 """ 

20 Describes how temperature changes as the annealing process goes on. 

21 The function maps a degree of completion (in the interval from 0 to 1) to a temperature value. 

22 """ 

23 

24 @abstractmethod 

25 def temperature(self, degree_of_completion: float) -> float: 

26 """ 

27 Computes the temperature for a degree of completion in [0, 1] 

28 :param degree_of_completion: the degree to which the simulated annealing process has completed in [0, 1] 

29 :return: the temperature 

30 """ 

31 pass 

32 

33 def probability(self, degree_of_completion: float, cost_delta: float) -> Tuple[float, float]: 

34 T = self.temperature(degree_of_completion) 

35 p = math.exp(-cost_delta / T) if T > 0.0 else 0.0 

36 return p, T 

37 

38 @abstractmethod 

39 def _get_params(self): 

40 pass 

41 

42 def __str__(self): 

43 return f"{self.__class__.__name__}{self._get_params()}" 

44 

45 

46class SATemperatureScheduleFixed(SATemperatureSchedule): 

47 """A schedule with a constant temperature""" 

48 def __init__(self, t): 

49 self.t = t 

50 

51 def temperature(self, degree_of_completion): 

52 return self.t 

53 

54 def _get_params(self): 

55 return {"fixedTemp": self.t} 

56 

57 

58class SATemperatureScheduleExponential(SATemperatureSchedule): 

59 """ A temperature schedule for simulated annealing where the temperature drops exponentially. """ 

60 

61 def __init__(self, t0, t1, exponent_factor): 

62 """ 

63 :param t0: temperature at the beginning (degree of completion 0) 

64 :param t1: temperature at the end (degree of completion 1), which must be smaller than t0 and larger than 0 

65 :param exponent_factor: factor with which to multiply the exponent (the larger the factor, the faster the temperature drops) 

66 """ 

67 super().__init__() 

68 if t1 > t0 or t0 <= 0: 

69 raise ValueError("Inadmissible temperatures given.") 

70 

71 if exponent_factor < 1: 

72 raise ValueError("The exponent factor cannot be less than 1.") 

73 

74 self.t0 = t0 

75 self.t1 = t1 

76 self.t_drop = t0 - t1 

77 self.exponent_factor = exponent_factor 

78 

79 def temperature(self, degree_of_completion): 

80 return self.t0 - self.t_drop * (1.0 - math.exp(-self.exponent_factor * degree_of_completion)) 

81 

82 def _get_params(self): 

83 return {"t0": self.t0, "t1": self.t1, "exp_factor": self.exponent_factor} 

84 

85 

86def reverse_sigmoid(degree_of_completion, v0, v1, steepness, mid_degree=0.5): 

87 return v0 - (v0-v1) / (1.0 + math.exp(-steepness * (degree_of_completion - mid_degree))) 

88 

89 

90class SATemperatureScheduleReverseSigmoid(SATemperatureSchedule): 

91 """ 

92 A temperature schedule for simulated annealing where the temperature drops in a reverse sigmoid curve (based on the logistic function) 

93 """ 

94 

95 def __init__(self, t0, t1, steepness, mid_degree=0.5): 

96 """ 

97 :param t0: temperature at the beginning (degree of completion 0) 

98 :param t1: temperature at the end (degree of completion 1), which must be smaller than t0 and larger than 0 

99 :param steepness: Factor which roughly corresponds to the (negative) slope at the inflection point 0.5. 

100 For the sigmoid shape to be sufficiently pronounced, a value of at least 5 is recommended (the closer the value is to 1, 

101 the more the curve approaches a linear decay). 

102 :param mid_degree: the degree of completion at which the function shall return the temperature (t0+t1)/2 

103 """ 

104 super().__init__() 

105 if t1 > t0 or t0 <= 0: 

106 raise ValueError("Inadmissible temperatures given.") 

107 if mid_degree < 0 or mid_degree > 1: 

108 raise Exception("Mid-degree must be between 0 and 1") 

109 

110 self.t0 = t0 

111 self.t1 = t1 

112 self.steepness = steepness 

113 self.mid_degree = mid_degree 

114 

115 def temperature(self, degree_of_completion): 

116 return reverse_sigmoid(degree_of_completion, self.t0, self.t1, self.steepness, self.mid_degree) 

117 

118 def _get_params(self): 

119 return {"t0": self.t0, "t1": self.t1, "steepness": self.steepness, "mid_degree": self.mid_degree} 

120 

121 

122class SATemperatureScheduleReverseSigmoidSymmetric(SATemperatureScheduleReverseSigmoid): 

123 """ 

124 A variant of the logistic schedule with a reverse sigmoid shape, where the probability of acceptance for a given assumed positive 

125 cost delta is symmetric. "Symmetric" here means that half the schedule is to be above 0.5 and half of it below 0.5. 

126 """ 

127 def __init__(self, t0, t1, steepness, cost_delta_for_symmetry): 

128 """ 

129 :param t0: temperature at the beginning (degree of completion 0) 

130 :param t1: temperature at the end (degree of completion 1), which must be smaller than t0 and larger than 0 

131 :param steepness: Factor which roughly corresponds to the (negative) slope at the inflection point 0.5. 

132 For the sigmoid shape to be sufficiently pronounced, a value of at least 8 is recommended (the closer the value is to 1, 

133 the more the curve approaches a linear decay). 

134 :param cost_delta_for_symmetry: the (positive) cost delta value which the curve shall be "symmetric" 

135 """ 

136 k = steepness 

137 cdelta = cost_delta_for_symmetry 

138 mid_degree = -(2*math.log(-(13614799*t0-19642003*cdelta)/(13614799*t1-19642003*cdelta))-k)/(2*k) 

139 super().__init__(t0, t1, steepness, mid_degree) 

140 

141 

142class SATemperatureSchedulePower(SATemperatureSchedule): 

143 """ 

144 A temperature schedule for simulated annealing where the temperature drops with powers of the degree of completion d, 

145 i.e. the temperature drop is proportional to d to the power of e for a given exponent e. 

146 """ 

147 

148 def __init__(self, t0, t1, exponent): 

149 """ 

150 :param t0: temperature at the beginning (degree of completion 0) 

151 :param t1: temperature at the end (degree of completion 1), which must be smaller than t0 and larger than 0 

152 :param exponent: the exponent of the power function 

153 """ 

154 super().__init__() 

155 if t1 > t0 or t0 <= 0: 

156 raise ValueError("Inadmissible temperatures given.") 

157 

158 self.t0 = t0 

159 self.t1 = t1 

160 self.tDrop = t0 - t1 

161 self.exponent = exponent 

162 

163 def temperature(self, degree_of_completion): 

164 return self.t0 - self.tDrop * (math.pow(degree_of_completion, self.exponent)) 

165 

166 def _get_params(self): 

167 return {"t0": self.t0, "t1": self.t1, "exponent": self.exponent} 

168 

169 

170class SAProbabilityFunction(ABC): 

171 def __init__(self, **params_dict): 

172 self.params_dict = params_dict 

173 

174 def __str__(self): 

175 return f"{self.__class__.__name__}{self.params_dict}" 

176 

177 @abstractmethod 

178 def __call__(self, degree_of_completion): 

179 pass 

180 

181 

182class SAProbabilityFunctionLinear(SAProbabilityFunction): 

183 """A probability function where probabilities decay linearly""" 

184 def __init__(self, p0=1.0, p1=0.0): 

185 super().__init__(p0=p0, p1=p1) 

186 self.p0 = p0 

187 self.p1 = p1 

188 

189 def __call__(self, degree_of_completion): 

190 return self.p0 - degree_of_completion * (self.p0 - self.p1) 

191 

192 

193class SAProbabilityFunctionReverseSigmoid(SAProbabilityFunction): 

194 """A probability function where probabilities decay in a reverse sigmoid shape""" 

195 def __init__(self, p0=1.0, p1=0.0, steepness=10): 

196 """ 

197 :param p0: the probability at the beginning (degree of completion 0) 

198 :param p1: the probability at the end (degree of completion 1) 

199 :param steepness: the steepness of the sigmoid curve 

200 """ 

201 super().__init__(p0=p0, p1=p1, steepness=steepness) 

202 self.p0 = p0 

203 self.p1 = p1 

204 self.steepness = steepness 

205 

206 def __call__(self, degree_of_completion): 

207 return reverse_sigmoid(degree_of_completion, self.p0, self.p1, self.steepness) 

208 

209 

210class SAProbabilityFunctionConstant(SAProbabilityFunction): 

211 """A constant probability function (which returns the same probability for all degrees of completion)""" 

212 def __init__(self, p): 

213 super().__init__(p=p) 

214 self.p = p 

215 

216 def __call__(self, degree_of_completion): 

217 return self.p 

218 

219 

220class SAProbabilitySchedule(SATemperatureSchedule): 

221 """ 

222 A temperature schedule where temperatures are derived from a probability schedule that is to apply to a reference cost delta, which 

223 is either given or is computed from observed values (the latter resulting in an adaptive schedule). 

224 It converts a function that returns probabilities for degrees of completion into a corresponding temperature schedule. 

225 """ 

226 def __init__(self, reference_cost_delta: Optional[float], probability_function: SAProbabilityFunction): 

227 """ 

228 Creates a temperature schedule for a reference cost delta (which can also be computed automatically from observed data) 

229 and probability function: 

230 The schedule will return temperatures such that for referenceCostDelta, the probability of acceptance of a move at 

231 degree of completion d in [0,1] will be probabilityFunction(d). 

232 

233 :param reference_cost_delta: the (positive) cost delta for which the probability function is to apply; 

234 if None, adaptively determine it from the empirical mean 

235 :param probability_function: a function which maps degrees of completion in [0,1] to probabilities in [0,1] 

236 """ 

237 self.adaptive = reference_cost_delta is None 

238 self.referenceCostDelta = reference_cost_delta 

239 self.probabilityFunction = probability_function 

240 self.paramsDict = { 

241 "refCostDelta": reference_cost_delta if reference_cost_delta is not None else "adaptive", 

242 "probabilityFunction": str(probability_function)} 

243 

244 # helper variables for adaptive mode 

245 self._costDeltaSum = 0 

246 self._costDeltaCount = 0 

247 

248 def temperature(self, degree_of_completion): 

249 if self.adaptive and self._costDeltaCount == 0: 

250 raise Exception("Cannot generate a temperature from an adaptive schedule without any previous cost-delta samples") 

251 p = self.probabilityFunction(degree_of_completion) 

252 if p == 0.0: 

253 return 0 

254 else: 

255 return -self.referenceCostDelta / math.log(p) 

256 

257 def probability(self, degree_of_completion, cost_delta): 

258 if self.adaptive: 

259 self._costDeltaSum += cost_delta 

260 self._costDeltaCount += 1 

261 self.referenceCostDelta = max(self._costDeltaSum / self._costDeltaCount, 1e-10) 

262 return super().probability(degree_of_completion, cost_delta) 

263 

264 def _get_params(self): 

265 return self.paramsDict 

266 

267 

268class SACostValue(ABC): 

269 """Representation of an immutable cost value""" 

270 

271 @abstractmethod 

272 def value(self): 

273 """Returns the numeric cost value""" 

274 pass 

275 

276 @abstractmethod 

277 def add(self, other) -> 'SACostValue': 

278 pass 

279 

280 

281class SACostValueNumeric(SACostValue): 

282 def __init__(self, scalar): 

283 self._value = scalar 

284 

285 def __str__(self): 

286 return str(self.value()) 

287 

288 def value(self): 

289 return self._value 

290 

291 def add(self, other: 'SACostValueNumeric') -> 'SACostValueNumeric': 

292 return SACostValueNumeric(self._value + other._value) 

293 

294 

295class SAState(ABC): 

296 """Represents the state/variable assignment during a simulated annealing process""" 

297 

298 def __init__(self, r: random.Random): 

299 self.r = r 

300 self.cost = self.compute_cost_value() 

301 

302 @abstractmethod 

303 def compute_cost_value(self) -> SACostValue: 

304 """Computes the costs of this state (from scratch)""" 

305 pass 

306 

307 @abstractmethod 

308 def get_state_representation(self): 

309 """ 

310 Returns a compact state representation (for the purpose of archiving a hitherto best result), which can later be 

311 applied via applyStateRepresentation. 

312 

313 :return: a compact state representation of some sort 

314 """ 

315 pass 

316 

317 @abstractmethod 

318 def apply_state_representation(self, representation): 

319 """ 

320 Applies the given state representation (as returned via `getStateRepresentation`) in order for the optimisation result to 

321 be obtained by the user. 

322 Note that the function does not necessarily need to change this state to reflect the representation, as its sole 

323 purpose is for the optimsation result to be obtainable thereafter (it is not used during the optimisation process as such). 

324 

325 :param representation: a representation as returned by `getStateRepresentation` 

326 """ 

327 pass 

328 

329 

330TSAState = TypeVar("TSAState", bound=SAState) 

331 

332 

333class SAOperator(Generic[TSAState]): 

334 """ 

335 An operator which, when applied with appropriately chosen parameters, can transform a state into another 

336 state during simulated annealing 

337 """ 

338 

339 def __init__(self, state: TSAState): 

340 """ 

341 :param state: the state to which the operator is applied 

342 """ 

343 self.state = state 

344 

345 def apply_cost_change(self, cost_delta: SACostValue): 

346 """ 

347 Applies the cost change to the state given at construction 

348 

349 :param cost_delta: the cost change to apply 

350 """ 

351 self.state.cost = self.state.cost.add(cost_delta) 

352 

353 @abstractmethod 

354 def apply_state_change(self, *params): 

355 """ 

356 Applies the operator to the state, i.e. it makes the changes to the state only 

357 (and does not update the associated costs) 

358 

359 :param params: the parameters with which the operator is to be applied 

360 :return: 

361 """ 

362 pass 

363 

364 def apply(self, params: Tuple, cost_delta: SACostValue): 

365 """ 

366 Applies the operator to the state given at construction, changing the state and updating the costs appropriately 

367 

368 :param params: the parameters with which the operator is to be applied 

369 :param cost_delta: the cost change that results from the application 

370 :return: 

371 """ 

372 self.apply_cost_change(cost_delta) 

373 self.apply_state_change(*params) 

374 

375 @abstractmethod 

376 def cost_delta(self, *params) -> SACostValue: 

377 """ 

378 Computes the cost change that would apply when applying this operator with the given parameters 

379 

380 :param params: an arbitrary list of parameters (specific to the concrete operator) 

381 :return: 

382 """ 

383 pass 

384 

385 @abstractmethod 

386 def choose_params(self) -> Optional[Tuple[Tuple, Optional[SACostValue]]]: 

387 """ 

388 Chooses parameters for the application of the operator (e.g. randomly or greedily). 

389 

390 :return: a tuple (params, costValue) or None if no suitable parameters are found, where params is the list of chosen 

391 parameters and costValue is either an instance of CostValue or None if the costs have not been computed. 

392 """ 

393 pass 

394 

395 

396class SAChain(Generic[TSAState]): 

397 """Manages the progression of one state during simulated annealing""" 

398 

399 log = log.getChild(__qualname__) 

400 

401 def __init__(self, 

402 state_factory: Callable[[random.Random], TSAState], 

403 schedule: SATemperatureSchedule, 

404 ops_and_weights: Sequence[Tuple[Callable[[TSAState], SAOperator[TSAState]], float]], 

405 random_seed, 

406 collect_stats=False): 

407 self.schedule = schedule 

408 self.r = random.Random(random_seed) 

409 self.state = state_factory(self.r) 

410 self.collect_stats = collect_stats 

411 operators, weights = zip(*ops_and_weights) 

412 cum_weights, s = [], 0 

413 for weight in weights: 

414 s += weight 

415 cum_weights.append(s) 

416 self.ops = [cons(self.state) for cons in operators] 

417 self.op_cum_weights = cum_weights 

418 self.steps_taken = 0 

419 self.count_none_params = 0 

420 self.count_best_updates = -1 

421 self.best_cost = None 

422 self.best_state_repr = None 

423 self.logged_series = collections.defaultdict(lambda: []) 

424 self._update_best_state() 

425 

426 if self.collect_stats: 

427 self.operator_inapplicability_counters = {} 

428 for op in self.ops: 

429 self.operator_inapplicability_counters[op] = RelativeFrequencyCounter() 

430 

431 def _update_best_state(self): 

432 cost = self.state.cost 

433 if self.best_cost is None or cost.value() < self.best_cost.value(): 

434 self.best_cost = cost 

435 self.best_state_repr = self.state.get_state_representation() 

436 self.count_best_updates += 1 

437 

438 def step(self, degree_of_completion): 

439 r = self.r 

440 

441 # make move 

442 op = r.choices(self.ops, cum_weights=self.op_cum_weights, k=1)[0] 

443 param_choice = op.choose_params() 

444 if param_choice is None: 

445 self.count_none_params += 1 

446 else: 

447 params, cost_change = param_choice 

448 if cost_change is None: 

449 cost_change = op.cost_delta(*params) 

450 if cost_change.value() < 0: 

451 make_move = True 

452 else: 

453 cost_change_value = cost_change.value() 

454 p, T = self.schedule.probability(degree_of_completion, cost_change_value) 

455 make_move = r.random() <= p 

456 self.log.debug(f'p: {p}, T: {T}, costDelta: {cost_change_value}, move: {make_move}') 

457 if self.collect_stats: 

458 self.logged_series["temperatures"].append(T) 

459 self.logged_series["probabilities"].append(p) 

460 if make_move: 

461 op.apply(params, cost_change) 

462 self._update_best_state() 

463 if self.collect_stats: 

464 self.logged_series["costDeltas"].append(cost_change.value()) 

465 if self.collect_stats: 

466 self.logged_series["bestCostValues"].append(self.best_cost.value()) 

467 self.logged_series["costValues"].append(self.state.cost.value()) 

468 self.operator_inapplicability_counters[op].count(param_choice is None) 

469 

470 self.steps_taken += 1 

471 

472 if self.log.isEnabledFor(logging.DEBUG): 

473 self.log.debug(f"Step {self.steps_taken}: cost={self.state.cost}; best cost={self.best_cost}") 

474 

475 def log_stats(self): 

476 if self.collect_stats: 

477 stats = {"useless moves total (None params)": f"{self.count_none_params}/{self.steps_taken}"} 

478 for op, counter in self.operator_inapplicability_counters.items(): 

479 stats[f"useless moves of {op}"] = str(counter) 

480 logged_cost_deltas = self.logged_series["costDeltas"] 

481 if logged_cost_deltas: 

482 stats["mean cost delta"] = f"{np.mean(logged_cost_deltas):.3f} +- { np.std(logged_cost_deltas):.3f}" 

483 abs_cost_deltas = np.abs(logged_cost_deltas) 

484 stats["mean absolute cost delta"] = f"{np.mean(abs_cost_deltas):.3f} +- {np.std(abs_cost_deltas):.3f}" 

485 positive_cost_deltas = [cd for cd in logged_cost_deltas if cd > 0] 

486 if positive_cost_deltas: 

487 stats["positive cost delta"] = f"mean={np.mean(positive_cost_deltas):.3f} +- {np.std(positive_cost_deltas):.3f}," \ 

488 f" max={np.max(positive_cost_deltas):.3f}" 

489 stats_join = "\n " 

490 self.log.info(f"Stats: {stats_join.join([key + ': ' + value for (key, value) in stats.items()])}") 

491 self.log.info(f"Best solution has {self.best_cost} after {self.count_best_updates} updates of best state") 

492 

493 def apply_best_state(self): 

494 """Applies the best state representation found in this chain to the chain's state""" 

495 self.state.apply_state_representation(self.best_state_repr) 

496 self.state.cost = self.best_cost 

497 

498 def plot_series(self, series_name): 

499 """ 

500 Plots one of the logged series 

501 

502 :param series_name: the name of the series (see getSeries) 

503 """ 

504 series = self.get_series(series_name) 

505 plt.figure() 

506 series.plot(title=series_name) 

507 

508 def get_series(self, series_name): 

509 """ 

510 Gets one of the logged series (for collectStats==True) 

511 

512 :param series_name: name of the series: one of "temperatures", "probabilities", "costDeltas", "bestCostValues", "costValues 

513 """ 

514 if not self.collect_stats: 

515 raise Exception("No stats were collected") 

516 if series_name not in self.logged_series: 

517 raise Exception("Unknown series") 

518 return pd.Series(self.logged_series[series_name]) 

519 

520 

521class SimulatedAnnealing(Generic[TSAState]): 

522 """ 

523 The simulated annealing algorithm for discrete optimisation (cost minimisation) 

524 """ 

525 

526 log = log.getChild(__qualname__) 

527 

528 def __init__(self, 

529 schedule_factory: Callable[[], SATemperatureSchedule], 

530 ops_and_weights: Sequence[Tuple[Callable[[TSAState], SAOperator[TSAState]], float]], 

531 max_steps: int = None, 

532 duration: float = None, 

533 random_seed=42, 

534 collect_stats=False): 

535 """ 

536 :param schedule_factory: a factory for the creation of the temperature schedule for the annealing process 

537 :param ops_and_weights: a list of operator factories with associated weights, where weights are to indicate the (non-normalised) 

538 probability of choosing the associated operator 

539 :param max_steps: the number of steps for which to run the optimisation; may be None (if not given, duration must be provided) 

540 :param duration: the duration, in seconds, for which to run the optimisation; may be None (if not given, maxSteps must be provided) 

541 :param random_seed: the random seed to use for all random choices 

542 :param collect_stats: flag indicating whether to collect additional statics which will be logged 

543 """ 

544 if max_steps is not None and max_steps <= 0: 

545 raise ValueError("The number of iterations should be greater than 0.") 

546 if max_steps is None and duration is None or (max_steps is not None and duration is not None): 

547 raise ValueError("Exactly one of {maxSteps, duration} must be specified.") 

548 if duration is not None and duration <= 0: 

549 raise ValueError("Duration must be greater than 0 if provided") 

550 self.scheduleFactory = schedule_factory 

551 self.max_steps = max_steps 

552 self.duration = duration 

553 self.randomSeed = random_seed 

554 self.opsAndWeights = ops_and_weights 

555 self.collect_stats = collect_stats 

556 self._chain = None 

557 

558 def optimise(self, state_factory: Callable[[random.Random], TSAState]) -> TSAState: 

559 """ 

560 Applies the annealing process, starting with a state created via the given factory. 

561 

562 :param state_factory: the factory with which to create the (initial) state 

563 :return: the state with the least-cost representation found during the optimisation applied 

564 """ 

565 chain = SAChain(state_factory, self.scheduleFactory(), ops_and_weights=self.opsAndWeights, random_seed=self.randomSeed, 

566 collect_stats=self.collect_stats) 

567 self.log.info(f"Running simulated annealing with {len(self.opsAndWeights)} operators for " 

568 f"{'%d steps' % self.max_steps if self.max_steps is not None else '%d seconds' % self.duration} ...") 

569 start_time = time.time() 

570 while True: 

571 time_elapsed = time.time() - start_time 

572 if (self.max_steps is not None and chain.steps_taken >= self.max_steps) or (self.duration is not None and time_elapsed >= self.duration): 

573 break 

574 if self.max_steps is not None: 

575 degree_of_completion = chain.steps_taken / self.max_steps 

576 else: 

577 degree_of_completion = time_elapsed / self.duration 

578 chain.step(degree_of_completion) 

579 self.log.info(f"Simulated annealing completed after {time.time()-start_time:.1f} seconds, {chain.steps_taken} steps") 

580 chain.log_stats() 

581 chain.apply_best_state() 

582 if self.collect_stats: 

583 self._chain = chain 

584 return chain.state 

585 

586 def get_chain(self) -> Optional[SAChain[TSAState]]: 

587 """ 

588 Gets the chain used by the most recently completed application (optimise call) of this object 

589 for the case where stats collection was enabled; the chain then contains relevant series and may be used 

590 to generate plots. If stats collection was not enabled, returns None. 

591 """ 

592 return self._chain 

593 

594 

595class ParallelTempering(Generic[TSAState]): 

596 """ 

597 The parallel tempering algorithm for discrete optimisation (cost minimisation) 

598 """ 

599 

600 log = log.getChild(__qualname__) 

601 

602 def __init__(self, 

603 num_chains, 

604 ops_and_weights: Sequence[Tuple[Callable[[TSAState], SAOperator[TSAState]], float]], 

605 schedule: SATemperatureSchedule = None, 

606 probability_function: SAProbabilityFunction = None, 

607 max_steps: int = None, 

608 duration: float = None, 

609 random_seed=42, 

610 log_cost_progression=False): 

611 """ 

612 Creates a parallel tempering optimiser with the given number of chains and operators for each chain. 

613 To determine the schedule to use for each chain, either schedule or probabilityFunction must be provided. 

614 It is usually more robust to use adaptive schedules and therefore to provide probabilityFunction. 

615 

616 :param num_chains: the number of parallel chains 

617 :param ops_and_weights: a list of operators with associated weights (which are to indicate the non-normalised probability of 

618 choosing the associated operator) 

619 :param schedule: the temperature schedule from which numChains temperatures of chains are drawn (using equidistant degrees of 

620 completion); if None, must provide probabilityFunction 

621 :param probability_function: the probability function from which numChains probabilities for adaptive probability schedules, each 

622 using a constant probability, are to be drawn; if None, must provide schedule 

623 :param max_steps: the number of steps for which to run the optimisation; may be None (if not given, duration must be provided) 

624 :param duration: the duration, in seconds, for which to run the optimisation; may be None (if not given, maxSteps must be provided) 

625 :param random_seed: the random seed to use for all random choices 

626 :param log_cost_progression: whether to log cost progression of all chains (such that it can be plotted after the fact via 

627 plotCostProgression) 

628 """ 

629 if max_steps is not None and max_steps <= 0: 

630 raise ValueError("The number of iterations should be greater than 0.") 

631 if (max_steps is None and duration is None) or (max_steps is not None and duration is not None): 

632 raise ValueError("Exactly one of {maxSteps, duration} must be specified.") 

633 if duration is not None and duration <= 0: 

634 raise ValueError("duration should be greater than 0 if provided.") 

635 if num_chains < 2: 

636 raise ValueError("Number of chains must be at least 2.") 

637 if (schedule is None and probability_function is None) or (schedule is not None and probability_function is not None): 

638 raise ValueError("Exactly one of {schedule, probabilityFunction} must be given") 

639 self.max_steps = max_steps 

640 self.duration = duration 

641 self.random_seed = random_seed 

642 self.num_chains = num_chains 

643 self.base_schedule = schedule 

644 self.base_probability_function = probability_function 

645 self.ops_and_weights = ops_and_weights 

646 self.log_cost_progression = log_cost_progression 

647 

648 # transient members 

649 self._cost_progressions = None 

650 self._schedule_param_strings = None 

651 

652 def _create_schedules(self): 

653 degree_step = 1.0 / (self.num_chains - 1) 

654 degrees_of_completion = [i*degree_step for i in range(self.num_chains)] 

655 if self.base_schedule is not None: 

656 # create schedules with fixed temperatures taken from base schedule 

657 temperatures = [self.base_schedule.temperature(d) for d in degrees_of_completion] 

658 self._schedule_param_strings = ["T=%.2f" % t for t in temperatures] 

659 return [SATemperatureScheduleFixed(t) for t in temperatures] 

660 else: 

661 # create adaptive probability schedules based on probabilities taken from base probability function 

662 probabilities = [self.base_probability_function(d) for d in degrees_of_completion] 

663 self._schedule_param_strings = ["p=%.3f" % p for p in probabilities] 

664 return [SAProbabilitySchedule(None, SAProbabilityFunctionConstant(p)) for p in probabilities] 

665 

666 def optimise(self, state_factory: Callable[[random.Random], SAState]) -> SAState: 

667 """ 

668 Applies the optimisation process, starting, in each chain, with a state created via the given factory. 

669 

670 :param state_factory: the factory with which to create the states for all chains 

671 :return: the state with the least-cost representation found during the optimisation (among all parallel chains) applied 

672 """ 

673 self.log.info(f"Running parallel tempering with {self.num_chains} chains, {len(self.ops_and_weights)} operators for " 

674 f"{'%d steps' % self.max_steps if self.max_steps is not None else '%d seconds' % self.duration} ...") 

675 

676 r = random.Random(self.random_seed) 

677 chains = [] 

678 cost_progressions = [] 

679 for i, schedule in enumerate(self._create_schedules(), start=1): 

680 self.log.info(f"Chain {i} uses {schedule}") 

681 chains.append(SAChain(state_factory, schedule, ops_and_weights=self.ops_and_weights, random_seed=r.randint(0, 1000))) 

682 cost_progressions.append([]) 

683 

684 start_time = time.time() 

685 step = 0 

686 num_chain_swaps = 0 

687 while True: 

688 time_elapsed = time.time() - start_time 

689 if (self.max_steps is not None and step > self.max_steps) or (self.duration is not None and time_elapsed > self.duration): 

690 break 

691 

692 # take one step in each chain 

693 degree_of_completion = step / self.max_steps if self.max_steps is not None else time_elapsed / self.duration 

694 for chain in chains: 

695 chain.step(degree_of_completion) 

696 

697 # check if neighbouring chains can be "swapped": if a high-temperature chain has a better state 

698 # than a low-temperature chain, swap them (by exchanging their schedules and swapping them 

699 # in the chains array, which shall always be in descending order of temperature) 

700 for idx_high_temp_chain in range(0, self.num_chains - 1): 

701 idx_low_temp_chain = idx_high_temp_chain+1 

702 high_temp_chain = chains[idx_high_temp_chain] 

703 low_temp_chain = chains[idx_low_temp_chain] 

704 if high_temp_chain.state.cost.value() < low_temp_chain.state.cost.value(): 

705 high_temp_chain.schedule, low_temp_chain.schedule = low_temp_chain.schedule, high_temp_chain.schedule 

706 chains[idx_low_temp_chain] = high_temp_chain 

707 chains[idx_high_temp_chain] = low_temp_chain 

708 num_chain_swaps += 1 

709 

710 if self.log_cost_progression: 

711 for idx_chain, chain in enumerate(chains): 

712 cost_progressions[idx_chain].append(chain.state.cost.value()) 

713 

714 step += 1 

715 

716 self.log.info(f"Number of chain swaps: {num_chain_swaps}") 

717 if self.log_cost_progression: self._cost_progressions = cost_progressions 

718 

719 # apply best solution 

720 best_chain_idx = int(np.argmin([chain.best_cost.value() for chain in chains])) 

721 chains[best_chain_idx].apply_best_state() 

722 return chains[best_chain_idx].state 

723 

724 def plot_cost_progression(self): 

725 if not self.log_cost_progression or self._cost_progressions is None: 

726 raise Exception("No cost progression was logged") 

727 series = {} 

728 for scheduleParamStr, costProgression in zip(self._schedule_param_strings, self._cost_progressions): 

729 series[scheduleParamStr] = costProgression 

730 plt.figure() 

731 pd.DataFrame(series).plot()