Coverage for src/sensai/local_search.py: 28%
384 statements
« prev ^ index » next coverage.py v7.6.1, created at 2024-11-29 18:29 +0000
« prev ^ index » next coverage.py v7.6.1, created at 2024-11-29 18:29 +0000
1import collections
2import logging
3import math
4import random
5import time
6from abc import ABC, abstractmethod
7from typing import Optional, Tuple, Callable, Type, Sequence, TypeVar, Generic
9import numpy as np
10import pandas as pd
11from matplotlib import pyplot as plt
13from .util.aggregation import RelativeFrequencyCounter
15log = logging.getLogger(__name__)
18class SATemperatureSchedule(ABC):
19 """
20 Describes how temperature changes as the annealing process goes on.
21 The function maps a degree of completion (in the interval from 0 to 1) to a temperature value.
22 """
24 @abstractmethod
25 def temperature(self, degree_of_completion: float) -> float:
26 """
27 Computes the temperature for a degree of completion in [0, 1]
28 :param degree_of_completion: the degree to which the simulated annealing process has completed in [0, 1]
29 :return: the temperature
30 """
31 pass
33 def probability(self, degree_of_completion: float, cost_delta: float) -> Tuple[float, float]:
34 T = self.temperature(degree_of_completion)
35 p = math.exp(-cost_delta / T) if T > 0.0 else 0.0
36 return p, T
38 @abstractmethod
39 def _get_params(self):
40 pass
42 def __str__(self):
43 return f"{self.__class__.__name__}{self._get_params()}"
46class SATemperatureScheduleFixed(SATemperatureSchedule):
47 """A schedule with a constant temperature"""
48 def __init__(self, t):
49 self.t = t
51 def temperature(self, degree_of_completion):
52 return self.t
54 def _get_params(self):
55 return {"fixedTemp": self.t}
58class SATemperatureScheduleExponential(SATemperatureSchedule):
59 """ A temperature schedule for simulated annealing where the temperature drops exponentially. """
61 def __init__(self, t0, t1, exponent_factor):
62 """
63 :param t0: temperature at the beginning (degree of completion 0)
64 :param t1: temperature at the end (degree of completion 1), which must be smaller than t0 and larger than 0
65 :param exponent_factor: factor with which to multiply the exponent (the larger the factor, the faster the temperature drops)
66 """
67 super().__init__()
68 if t1 > t0 or t0 <= 0:
69 raise ValueError("Inadmissible temperatures given.")
71 if exponent_factor < 1:
72 raise ValueError("The exponent factor cannot be less than 1.")
74 self.t0 = t0
75 self.t1 = t1
76 self.t_drop = t0 - t1
77 self.exponent_factor = exponent_factor
79 def temperature(self, degree_of_completion):
80 return self.t0 - self.t_drop * (1.0 - math.exp(-self.exponent_factor * degree_of_completion))
82 def _get_params(self):
83 return {"t0": self.t0, "t1": self.t1, "exp_factor": self.exponent_factor}
86def reverse_sigmoid(degree_of_completion, v0, v1, steepness, mid_degree=0.5):
87 return v0 - (v0-v1) / (1.0 + math.exp(-steepness * (degree_of_completion - mid_degree)))
90class SATemperatureScheduleReverseSigmoid(SATemperatureSchedule):
91 """
92 A temperature schedule for simulated annealing where the temperature drops in a reverse sigmoid curve (based on the logistic function)
93 """
95 def __init__(self, t0, t1, steepness, mid_degree=0.5):
96 """
97 :param t0: temperature at the beginning (degree of completion 0)
98 :param t1: temperature at the end (degree of completion 1), which must be smaller than t0 and larger than 0
99 :param steepness: Factor which roughly corresponds to the (negative) slope at the inflection point 0.5.
100 For the sigmoid shape to be sufficiently pronounced, a value of at least 5 is recommended (the closer the value is to 1,
101 the more the curve approaches a linear decay).
102 :param mid_degree: the degree of completion at which the function shall return the temperature (t0+t1)/2
103 """
104 super().__init__()
105 if t1 > t0 or t0 <= 0:
106 raise ValueError("Inadmissible temperatures given.")
107 if mid_degree < 0 or mid_degree > 1:
108 raise Exception("Mid-degree must be between 0 and 1")
110 self.t0 = t0
111 self.t1 = t1
112 self.steepness = steepness
113 self.mid_degree = mid_degree
115 def temperature(self, degree_of_completion):
116 return reverse_sigmoid(degree_of_completion, self.t0, self.t1, self.steepness, self.mid_degree)
118 def _get_params(self):
119 return {"t0": self.t0, "t1": self.t1, "steepness": self.steepness, "mid_degree": self.mid_degree}
122class SATemperatureScheduleReverseSigmoidSymmetric(SATemperatureScheduleReverseSigmoid):
123 """
124 A variant of the logistic schedule with a reverse sigmoid shape, where the probability of acceptance for a given assumed positive
125 cost delta is symmetric. "Symmetric" here means that half the schedule is to be above 0.5 and half of it below 0.5.
126 """
127 def __init__(self, t0, t1, steepness, cost_delta_for_symmetry):
128 """
129 :param t0: temperature at the beginning (degree of completion 0)
130 :param t1: temperature at the end (degree of completion 1), which must be smaller than t0 and larger than 0
131 :param steepness: Factor which roughly corresponds to the (negative) slope at the inflection point 0.5.
132 For the sigmoid shape to be sufficiently pronounced, a value of at least 8 is recommended (the closer the value is to 1,
133 the more the curve approaches a linear decay).
134 :param cost_delta_for_symmetry: the (positive) cost delta value which the curve shall be "symmetric"
135 """
136 k = steepness
137 cdelta = cost_delta_for_symmetry
138 mid_degree = -(2*math.log(-(13614799*t0-19642003*cdelta)/(13614799*t1-19642003*cdelta))-k)/(2*k)
139 super().__init__(t0, t1, steepness, mid_degree)
142class SATemperatureSchedulePower(SATemperatureSchedule):
143 """
144 A temperature schedule for simulated annealing where the temperature drops with powers of the degree of completion d,
145 i.e. the temperature drop is proportional to d to the power of e for a given exponent e.
146 """
148 def __init__(self, t0, t1, exponent):
149 """
150 :param t0: temperature at the beginning (degree of completion 0)
151 :param t1: temperature at the end (degree of completion 1), which must be smaller than t0 and larger than 0
152 :param exponent: the exponent of the power function
153 """
154 super().__init__()
155 if t1 > t0 or t0 <= 0:
156 raise ValueError("Inadmissible temperatures given.")
158 self.t0 = t0
159 self.t1 = t1
160 self.tDrop = t0 - t1
161 self.exponent = exponent
163 def temperature(self, degree_of_completion):
164 return self.t0 - self.tDrop * (math.pow(degree_of_completion, self.exponent))
166 def _get_params(self):
167 return {"t0": self.t0, "t1": self.t1, "exponent": self.exponent}
170class SAProbabilityFunction(ABC):
171 def __init__(self, **params_dict):
172 self.params_dict = params_dict
174 def __str__(self):
175 return f"{self.__class__.__name__}{self.params_dict}"
177 @abstractmethod
178 def __call__(self, degree_of_completion):
179 pass
182class SAProbabilityFunctionLinear(SAProbabilityFunction):
183 """A probability function where probabilities decay linearly"""
184 def __init__(self, p0=1.0, p1=0.0):
185 super().__init__(p0=p0, p1=p1)
186 self.p0 = p0
187 self.p1 = p1
189 def __call__(self, degree_of_completion):
190 return self.p0 - degree_of_completion * (self.p0 - self.p1)
193class SAProbabilityFunctionReverseSigmoid(SAProbabilityFunction):
194 """A probability function where probabilities decay in a reverse sigmoid shape"""
195 def __init__(self, p0=1.0, p1=0.0, steepness=10):
196 """
197 :param p0: the probability at the beginning (degree of completion 0)
198 :param p1: the probability at the end (degree of completion 1)
199 :param steepness: the steepness of the sigmoid curve
200 """
201 super().__init__(p0=p0, p1=p1, steepness=steepness)
202 self.p0 = p0
203 self.p1 = p1
204 self.steepness = steepness
206 def __call__(self, degree_of_completion):
207 return reverse_sigmoid(degree_of_completion, self.p0, self.p1, self.steepness)
210class SAProbabilityFunctionConstant(SAProbabilityFunction):
211 """A constant probability function (which returns the same probability for all degrees of completion)"""
212 def __init__(self, p):
213 super().__init__(p=p)
214 self.p = p
216 def __call__(self, degree_of_completion):
217 return self.p
220class SAProbabilitySchedule(SATemperatureSchedule):
221 """
222 A temperature schedule where temperatures are derived from a probability schedule that is to apply to a reference cost delta, which
223 is either given or is computed from observed values (the latter resulting in an adaptive schedule).
224 It converts a function that returns probabilities for degrees of completion into a corresponding temperature schedule.
225 """
226 def __init__(self, reference_cost_delta: Optional[float], probability_function: SAProbabilityFunction):
227 """
228 Creates a temperature schedule for a reference cost delta (which can also be computed automatically from observed data)
229 and probability function:
230 The schedule will return temperatures such that for referenceCostDelta, the probability of acceptance of a move at
231 degree of completion d in [0,1] will be probabilityFunction(d).
233 :param reference_cost_delta: the (positive) cost delta for which the probability function is to apply;
234 if None, adaptively determine it from the empirical mean
235 :param probability_function: a function which maps degrees of completion in [0,1] to probabilities in [0,1]
236 """
237 self.adaptive = reference_cost_delta is None
238 self.referenceCostDelta = reference_cost_delta
239 self.probabilityFunction = probability_function
240 self.paramsDict = {
241 "refCostDelta": reference_cost_delta if reference_cost_delta is not None else "adaptive",
242 "probabilityFunction": str(probability_function)}
244 # helper variables for adaptive mode
245 self._costDeltaSum = 0
246 self._costDeltaCount = 0
248 def temperature(self, degree_of_completion):
249 if self.adaptive and self._costDeltaCount == 0:
250 raise Exception("Cannot generate a temperature from an adaptive schedule without any previous cost-delta samples")
251 p = self.probabilityFunction(degree_of_completion)
252 if p == 0.0:
253 return 0
254 else:
255 return -self.referenceCostDelta / math.log(p)
257 def probability(self, degree_of_completion, cost_delta):
258 if self.adaptive:
259 self._costDeltaSum += cost_delta
260 self._costDeltaCount += 1
261 self.referenceCostDelta = max(self._costDeltaSum / self._costDeltaCount, 1e-10)
262 return super().probability(degree_of_completion, cost_delta)
264 def _get_params(self):
265 return self.paramsDict
268class SACostValue(ABC):
269 """Representation of an immutable cost value"""
271 @abstractmethod
272 def value(self):
273 """Returns the numeric cost value"""
274 pass
276 @abstractmethod
277 def add(self, other) -> 'SACostValue':
278 pass
281class SACostValueNumeric(SACostValue):
282 def __init__(self, scalar):
283 self._value = scalar
285 def __str__(self):
286 return str(self.value())
288 def value(self):
289 return self._value
291 def add(self, other: 'SACostValueNumeric') -> 'SACostValueNumeric':
292 return SACostValueNumeric(self._value + other._value)
295class SAState(ABC):
296 """Represents the state/variable assignment during a simulated annealing process"""
298 def __init__(self, r: random.Random):
299 self.r = r
300 self.cost = self.compute_cost_value()
302 @abstractmethod
303 def compute_cost_value(self) -> SACostValue:
304 """Computes the costs of this state (from scratch)"""
305 pass
307 @abstractmethod
308 def get_state_representation(self):
309 """
310 Returns a compact state representation (for the purpose of archiving a hitherto best result), which can later be
311 applied via applyStateRepresentation.
313 :return: a compact state representation of some sort
314 """
315 pass
317 @abstractmethod
318 def apply_state_representation(self, representation):
319 """
320 Applies the given state representation (as returned via `getStateRepresentation`) in order for the optimisation result to
321 be obtained by the user.
322 Note that the function does not necessarily need to change this state to reflect the representation, as its sole
323 purpose is for the optimsation result to be obtainable thereafter (it is not used during the optimisation process as such).
325 :param representation: a representation as returned by `getStateRepresentation`
326 """
327 pass
330TSAState = TypeVar("TSAState", bound=SAState)
333class SAOperator(Generic[TSAState]):
334 """
335 An operator which, when applied with appropriately chosen parameters, can transform a state into another
336 state during simulated annealing
337 """
339 def __init__(self, state: TSAState):
340 """
341 :param state: the state to which the operator is applied
342 """
343 self.state = state
345 def apply_cost_change(self, cost_delta: SACostValue):
346 """
347 Applies the cost change to the state given at construction
349 :param cost_delta: the cost change to apply
350 """
351 self.state.cost = self.state.cost.add(cost_delta)
353 @abstractmethod
354 def apply_state_change(self, *params):
355 """
356 Applies the operator to the state, i.e. it makes the changes to the state only
357 (and does not update the associated costs)
359 :param params: the parameters with which the operator is to be applied
360 :return:
361 """
362 pass
364 def apply(self, params: Tuple, cost_delta: SACostValue):
365 """
366 Applies the operator to the state given at construction, changing the state and updating the costs appropriately
368 :param params: the parameters with which the operator is to be applied
369 :param cost_delta: the cost change that results from the application
370 :return:
371 """
372 self.apply_cost_change(cost_delta)
373 self.apply_state_change(*params)
375 @abstractmethod
376 def cost_delta(self, *params) -> SACostValue:
377 """
378 Computes the cost change that would apply when applying this operator with the given parameters
380 :param params: an arbitrary list of parameters (specific to the concrete operator)
381 :return:
382 """
383 pass
385 @abstractmethod
386 def choose_params(self) -> Optional[Tuple[Tuple, Optional[SACostValue]]]:
387 """
388 Chooses parameters for the application of the operator (e.g. randomly or greedily).
390 :return: a tuple (params, costValue) or None if no suitable parameters are found, where params is the list of chosen
391 parameters and costValue is either an instance of CostValue or None if the costs have not been computed.
392 """
393 pass
396class SAChain(Generic[TSAState]):
397 """Manages the progression of one state during simulated annealing"""
399 log = log.getChild(__qualname__)
401 def __init__(self,
402 state_factory: Callable[[random.Random], TSAState],
403 schedule: SATemperatureSchedule,
404 ops_and_weights: Sequence[Tuple[Callable[[TSAState], SAOperator[TSAState]], float]],
405 random_seed,
406 collect_stats=False):
407 self.schedule = schedule
408 self.r = random.Random(random_seed)
409 self.state = state_factory(self.r)
410 self.collect_stats = collect_stats
411 operators, weights = zip(*ops_and_weights)
412 cum_weights, s = [], 0
413 for weight in weights:
414 s += weight
415 cum_weights.append(s)
416 self.ops = [cons(self.state) for cons in operators]
417 self.op_cum_weights = cum_weights
418 self.steps_taken = 0
419 self.count_none_params = 0
420 self.count_best_updates = -1
421 self.best_cost = None
422 self.best_state_repr = None
423 self.logged_series = collections.defaultdict(lambda: [])
424 self._update_best_state()
426 if self.collect_stats:
427 self.operator_inapplicability_counters = {}
428 for op in self.ops:
429 self.operator_inapplicability_counters[op] = RelativeFrequencyCounter()
431 def _update_best_state(self):
432 cost = self.state.cost
433 if self.best_cost is None or cost.value() < self.best_cost.value():
434 self.best_cost = cost
435 self.best_state_repr = self.state.get_state_representation()
436 self.count_best_updates += 1
438 def step(self, degree_of_completion):
439 r = self.r
441 # make move
442 op = r.choices(self.ops, cum_weights=self.op_cum_weights, k=1)[0]
443 param_choice = op.choose_params()
444 if param_choice is None:
445 self.count_none_params += 1
446 else:
447 params, cost_change = param_choice
448 if cost_change is None:
449 cost_change = op.cost_delta(*params)
450 if cost_change.value() < 0:
451 make_move = True
452 else:
453 cost_change_value = cost_change.value()
454 p, T = self.schedule.probability(degree_of_completion, cost_change_value)
455 make_move = r.random() <= p
456 self.log.debug(f'p: {p}, T: {T}, costDelta: {cost_change_value}, move: {make_move}')
457 if self.collect_stats:
458 self.logged_series["temperatures"].append(T)
459 self.logged_series["probabilities"].append(p)
460 if make_move:
461 op.apply(params, cost_change)
462 self._update_best_state()
463 if self.collect_stats:
464 self.logged_series["costDeltas"].append(cost_change.value())
465 if self.collect_stats:
466 self.logged_series["bestCostValues"].append(self.best_cost.value())
467 self.logged_series["costValues"].append(self.state.cost.value())
468 self.operator_inapplicability_counters[op].count(param_choice is None)
470 self.steps_taken += 1
472 if self.log.isEnabledFor(logging.DEBUG):
473 self.log.debug(f"Step {self.steps_taken}: cost={self.state.cost}; best cost={self.best_cost}")
475 def log_stats(self):
476 if self.collect_stats:
477 stats = {"useless moves total (None params)": f"{self.count_none_params}/{self.steps_taken}"}
478 for op, counter in self.operator_inapplicability_counters.items():
479 stats[f"useless moves of {op}"] = str(counter)
480 logged_cost_deltas = self.logged_series["costDeltas"]
481 if logged_cost_deltas:
482 stats["mean cost delta"] = f"{np.mean(logged_cost_deltas):.3f} +- { np.std(logged_cost_deltas):.3f}"
483 abs_cost_deltas = np.abs(logged_cost_deltas)
484 stats["mean absolute cost delta"] = f"{np.mean(abs_cost_deltas):.3f} +- {np.std(abs_cost_deltas):.3f}"
485 positive_cost_deltas = [cd for cd in logged_cost_deltas if cd > 0]
486 if positive_cost_deltas:
487 stats["positive cost delta"] = f"mean={np.mean(positive_cost_deltas):.3f} +- {np.std(positive_cost_deltas):.3f}," \
488 f" max={np.max(positive_cost_deltas):.3f}"
489 stats_join = "\n "
490 self.log.info(f"Stats: {stats_join.join([key + ': ' + value for (key, value) in stats.items()])}")
491 self.log.info(f"Best solution has {self.best_cost} after {self.count_best_updates} updates of best state")
493 def apply_best_state(self):
494 """Applies the best state representation found in this chain to the chain's state"""
495 self.state.apply_state_representation(self.best_state_repr)
496 self.state.cost = self.best_cost
498 def plot_series(self, series_name):
499 """
500 Plots one of the logged series
502 :param series_name: the name of the series (see getSeries)
503 """
504 series = self.get_series(series_name)
505 plt.figure()
506 series.plot(title=series_name)
508 def get_series(self, series_name):
509 """
510 Gets one of the logged series (for collectStats==True)
512 :param series_name: name of the series: one of "temperatures", "probabilities", "costDeltas", "bestCostValues", "costValues
513 """
514 if not self.collect_stats:
515 raise Exception("No stats were collected")
516 if series_name not in self.logged_series:
517 raise Exception("Unknown series")
518 return pd.Series(self.logged_series[series_name])
521class SimulatedAnnealing(Generic[TSAState]):
522 """
523 The simulated annealing algorithm for discrete optimisation (cost minimisation)
524 """
526 log = log.getChild(__qualname__)
528 def __init__(self,
529 schedule_factory: Callable[[], SATemperatureSchedule],
530 ops_and_weights: Sequence[Tuple[Callable[[TSAState], SAOperator[TSAState]], float]],
531 max_steps: int = None,
532 duration: float = None,
533 random_seed=42,
534 collect_stats=False):
535 """
536 :param schedule_factory: a factory for the creation of the temperature schedule for the annealing process
537 :param ops_and_weights: a list of operator factories with associated weights, where weights are to indicate the (non-normalised)
538 probability of choosing the associated operator
539 :param max_steps: the number of steps for which to run the optimisation; may be None (if not given, duration must be provided)
540 :param duration: the duration, in seconds, for which to run the optimisation; may be None (if not given, maxSteps must be provided)
541 :param random_seed: the random seed to use for all random choices
542 :param collect_stats: flag indicating whether to collect additional statics which will be logged
543 """
544 if max_steps is not None and max_steps <= 0:
545 raise ValueError("The number of iterations should be greater than 0.")
546 if max_steps is None and duration is None or (max_steps is not None and duration is not None):
547 raise ValueError("Exactly one of {maxSteps, duration} must be specified.")
548 if duration is not None and duration <= 0:
549 raise ValueError("Duration must be greater than 0 if provided")
550 self.scheduleFactory = schedule_factory
551 self.max_steps = max_steps
552 self.duration = duration
553 self.randomSeed = random_seed
554 self.opsAndWeights = ops_and_weights
555 self.collect_stats = collect_stats
556 self._chain = None
558 def optimise(self, state_factory: Callable[[random.Random], TSAState]) -> TSAState:
559 """
560 Applies the annealing process, starting with a state created via the given factory.
562 :param state_factory: the factory with which to create the (initial) state
563 :return: the state with the least-cost representation found during the optimisation applied
564 """
565 chain = SAChain(state_factory, self.scheduleFactory(), ops_and_weights=self.opsAndWeights, random_seed=self.randomSeed,
566 collect_stats=self.collect_stats)
567 self.log.info(f"Running simulated annealing with {len(self.opsAndWeights)} operators for "
568 f"{'%d steps' % self.max_steps if self.max_steps is not None else '%d seconds' % self.duration} ...")
569 start_time = time.time()
570 while True:
571 time_elapsed = time.time() - start_time
572 if (self.max_steps is not None and chain.steps_taken >= self.max_steps) or (self.duration is not None and time_elapsed >= self.duration):
573 break
574 if self.max_steps is not None:
575 degree_of_completion = chain.steps_taken / self.max_steps
576 else:
577 degree_of_completion = time_elapsed / self.duration
578 chain.step(degree_of_completion)
579 self.log.info(f"Simulated annealing completed after {time.time()-start_time:.1f} seconds, {chain.steps_taken} steps")
580 chain.log_stats()
581 chain.apply_best_state()
582 if self.collect_stats:
583 self._chain = chain
584 return chain.state
586 def get_chain(self) -> Optional[SAChain[TSAState]]:
587 """
588 Gets the chain used by the most recently completed application (optimise call) of this object
589 for the case where stats collection was enabled; the chain then contains relevant series and may be used
590 to generate plots. If stats collection was not enabled, returns None.
591 """
592 return self._chain
595class ParallelTempering(Generic[TSAState]):
596 """
597 The parallel tempering algorithm for discrete optimisation (cost minimisation)
598 """
600 log = log.getChild(__qualname__)
602 def __init__(self,
603 num_chains,
604 ops_and_weights: Sequence[Tuple[Callable[[TSAState], SAOperator[TSAState]], float]],
605 schedule: SATemperatureSchedule = None,
606 probability_function: SAProbabilityFunction = None,
607 max_steps: int = None,
608 duration: float = None,
609 random_seed=42,
610 log_cost_progression=False):
611 """
612 Creates a parallel tempering optimiser with the given number of chains and operators for each chain.
613 To determine the schedule to use for each chain, either schedule or probabilityFunction must be provided.
614 It is usually more robust to use adaptive schedules and therefore to provide probabilityFunction.
616 :param num_chains: the number of parallel chains
617 :param ops_and_weights: a list of operators with associated weights (which are to indicate the non-normalised probability of
618 choosing the associated operator)
619 :param schedule: the temperature schedule from which numChains temperatures of chains are drawn (using equidistant degrees of
620 completion); if None, must provide probabilityFunction
621 :param probability_function: the probability function from which numChains probabilities for adaptive probability schedules, each
622 using a constant probability, are to be drawn; if None, must provide schedule
623 :param max_steps: the number of steps for which to run the optimisation; may be None (if not given, duration must be provided)
624 :param duration: the duration, in seconds, for which to run the optimisation; may be None (if not given, maxSteps must be provided)
625 :param random_seed: the random seed to use for all random choices
626 :param log_cost_progression: whether to log cost progression of all chains (such that it can be plotted after the fact via
627 plotCostProgression)
628 """
629 if max_steps is not None and max_steps <= 0:
630 raise ValueError("The number of iterations should be greater than 0.")
631 if (max_steps is None and duration is None) or (max_steps is not None and duration is not None):
632 raise ValueError("Exactly one of {maxSteps, duration} must be specified.")
633 if duration is not None and duration <= 0:
634 raise ValueError("duration should be greater than 0 if provided.")
635 if num_chains < 2:
636 raise ValueError("Number of chains must be at least 2.")
637 if (schedule is None and probability_function is None) or (schedule is not None and probability_function is not None):
638 raise ValueError("Exactly one of {schedule, probabilityFunction} must be given")
639 self.max_steps = max_steps
640 self.duration = duration
641 self.random_seed = random_seed
642 self.num_chains = num_chains
643 self.base_schedule = schedule
644 self.base_probability_function = probability_function
645 self.ops_and_weights = ops_and_weights
646 self.log_cost_progression = log_cost_progression
648 # transient members
649 self._cost_progressions = None
650 self._schedule_param_strings = None
652 def _create_schedules(self):
653 degree_step = 1.0 / (self.num_chains - 1)
654 degrees_of_completion = [i*degree_step for i in range(self.num_chains)]
655 if self.base_schedule is not None:
656 # create schedules with fixed temperatures taken from base schedule
657 temperatures = [self.base_schedule.temperature(d) for d in degrees_of_completion]
658 self._schedule_param_strings = ["T=%.2f" % t for t in temperatures]
659 return [SATemperatureScheduleFixed(t) for t in temperatures]
660 else:
661 # create adaptive probability schedules based on probabilities taken from base probability function
662 probabilities = [self.base_probability_function(d) for d in degrees_of_completion]
663 self._schedule_param_strings = ["p=%.3f" % p for p in probabilities]
664 return [SAProbabilitySchedule(None, SAProbabilityFunctionConstant(p)) for p in probabilities]
666 def optimise(self, state_factory: Callable[[random.Random], SAState]) -> SAState:
667 """
668 Applies the optimisation process, starting, in each chain, with a state created via the given factory.
670 :param state_factory: the factory with which to create the states for all chains
671 :return: the state with the least-cost representation found during the optimisation (among all parallel chains) applied
672 """
673 self.log.info(f"Running parallel tempering with {self.num_chains} chains, {len(self.ops_and_weights)} operators for "
674 f"{'%d steps' % self.max_steps if self.max_steps is not None else '%d seconds' % self.duration} ...")
676 r = random.Random(self.random_seed)
677 chains = []
678 cost_progressions = []
679 for i, schedule in enumerate(self._create_schedules(), start=1):
680 self.log.info(f"Chain {i} uses {schedule}")
681 chains.append(SAChain(state_factory, schedule, ops_and_weights=self.ops_and_weights, random_seed=r.randint(0, 1000)))
682 cost_progressions.append([])
684 start_time = time.time()
685 step = 0
686 num_chain_swaps = 0
687 while True:
688 time_elapsed = time.time() - start_time
689 if (self.max_steps is not None and step > self.max_steps) or (self.duration is not None and time_elapsed > self.duration):
690 break
692 # take one step in each chain
693 degree_of_completion = step / self.max_steps if self.max_steps is not None else time_elapsed / self.duration
694 for chain in chains:
695 chain.step(degree_of_completion)
697 # check if neighbouring chains can be "swapped": if a high-temperature chain has a better state
698 # than a low-temperature chain, swap them (by exchanging their schedules and swapping them
699 # in the chains array, which shall always be in descending order of temperature)
700 for idx_high_temp_chain in range(0, self.num_chains - 1):
701 idx_low_temp_chain = idx_high_temp_chain+1
702 high_temp_chain = chains[idx_high_temp_chain]
703 low_temp_chain = chains[idx_low_temp_chain]
704 if high_temp_chain.state.cost.value() < low_temp_chain.state.cost.value():
705 high_temp_chain.schedule, low_temp_chain.schedule = low_temp_chain.schedule, high_temp_chain.schedule
706 chains[idx_low_temp_chain] = high_temp_chain
707 chains[idx_high_temp_chain] = low_temp_chain
708 num_chain_swaps += 1
710 if self.log_cost_progression:
711 for idx_chain, chain in enumerate(chains):
712 cost_progressions[idx_chain].append(chain.state.cost.value())
714 step += 1
716 self.log.info(f"Number of chain swaps: {num_chain_swaps}")
717 if self.log_cost_progression: self._cost_progressions = cost_progressions
719 # apply best solution
720 best_chain_idx = int(np.argmin([chain.best_cost.value() for chain in chains]))
721 chains[best_chain_idx].apply_best_state()
722 return chains[best_chain_idx].state
724 def plot_cost_progression(self):
725 if not self.log_cost_progression or self._cost_progressions is None:
726 raise Exception("No cost progression was logged")
727 series = {}
728 for scheduleParamStr, costProgression in zip(self._schedule_param_strings, self._cost_progressions):
729 series[scheduleParamStr] = costProgression
730 plt.figure()
731 pd.DataFrame(series).plot()