Coverage for src/sensai/minizinc.py: 0%
102 statements
« prev ^ index » next coverage.py v7.6.1, created at 2024-08-13 22:17 +0000
« prev ^ index » next coverage.py v7.6.1, created at 2024-08-13 22:17 +0000
1from abc import ABC, abstractmethod
2import logging
3import math
4import os
5import re
6import subprocess
7import tempfile
8import time
9from typing import List
11import numpy as np
14log = logging.getLogger(__name__)
17class CostScaler:
18 """
19 Serves to scale floats and converts them into integers (and vice versa) whilst
20 maintaining decimal precision
21 """
23 def __init__(self, cost_values: List[float], significant_digits: int):
24 """
25 Parameters:
26 cost_values: the sequence of cost values whose precision should be maintained in the int realm
27 significant_digits: the number of significant digits that shall at least be maintained
28 """
29 exp10 = significant_digits - 1 - min([0] + [np.floor(np.log10(v)) for v in cost_values])
30 self.scalingFactor = math.pow(10, exp10)
32 def scaled_int(self, original_value: float) -> int:
33 """Returns the scaled value as an integer"""
34 return int(round(original_value * self.scalingFactor))
36 def scaled_float(self, original_value: float) -> float:
37 return original_value * self.scalingFactor
39 def original_value(self, scaled_value: float) -> float:
40 """Returns the original unscaled value from a scaled value"""
41 return scaled_value / self.scalingFactor
43 def __str__(self):
44 return "CostScaler[factor=%d]" % self.scalingFactor
47class MiniZincProblem(ABC):
49 def create_mini_zinc_file(self, f):
50 """
51 Writes MiniZinc code
53 :param f: an OS-level handle to an open file
54 """
55 os.write(f, bytes(self.get_mini_zinc_code(), 'utf-8'))
57 @abstractmethod
58 def get_mini_zinc_code(self):
59 pass
62class MiniZincSolver(object):
63 log = log.getChild(__qualname__)
65 def __init__(self, name='OSICBC', solver_time_seconds=None, fzn_output_path=None):
66 """
67 :param name: name of solver compatible with miniZinc
68 :param solver_time_seconds: upper time limit for solver in seconds
69 :param fzn_output_path: flatZinc output path
70 """
71 self.solver_name = name
72 self.solver_time_limit_secs = solver_time_seconds
73 self.fzn_output_path = fzn_output_path
74 self.last_solver_time_secs = None
75 self.last_solver_output = None
76 self.lastSolverErrOutput = None
78 def __str(self):
79 return f"MiniZincSolver[{self.solver_name}]"
81 def solve_path(self, mzn_path: str, log_info=True) -> str:
82 """
83 Solves the MiniZinc problem stored at the given file path
85 :param mzn_path: path to file containing MiniZinc problem code
86 :param log_info: whether to log solver output at INFO level rather than DEBUG level
87 :return: the solver output
88 """
89 self.last_solver_time_secs = None
90 log_solver = self.log.info if log_info else self.log.debug
92 args = ["--statistics", "--solver", self.solver_name]
93 if self.solver_time_limit_secs is not None:
94 args.append("--time-limit")
95 args.append(str(self.solver_time_limit_secs * 1000))
96 if self.fzn_output_path is not None:
97 args.append("--output-fzn-to-file")
98 args.append(self.fzn_output_path)
99 args.append(mzn_path)
100 command = "minizinc " + " ".join(args)
102 self.log.info("Running %s" % command)
103 start_time = time.time()
104 proc = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
105 output = []
106 while True:
107 line = proc.stdout.readline().decode("utf-8")
108 if not line:
109 break
110 output.append(line)
111 log_solver("Solver output: %s" % line.rstrip())
112 output = "".join(output)
113 proc.wait()
114 if proc.returncode != 0:
115 raise Exception(f"MiniZinc call failed with return code {proc.returncode}; output: {output}")
116 self.last_solver_time_secs = time.time() - start_time
117 self.last_solver_output = output
118 self.log.info("Solver time: %.1fs" % self.last_solver_time_secs)
119 return output
121 def solve_problem(self, problem: MiniZincProblem, keep_temp_file=False, log_info=True) -> str:
122 """
123 Solves the given MiniZinc problem
125 :param problem: the problem to solve
126 :param keep_temp_file: whether to keep the temporary .mzv file
127 :param log_info: whether to log solver output at INFO level rather than DEBUG level
128 :return: the solver output
129 """
130 f, path = tempfile.mkstemp(".mzn")
131 try:
132 try:
133 problem.create_mini_zinc_file(f)
134 finally:
135 os.close(f)
136 return self.solve_path(path, log_info=log_info)
137 finally:
138 if not keep_temp_file:
139 os.unlink(path)
141 def get_last_solver_time_secs(self):
142 return self.last_solver_time_secs
145def extract_1d_array_from_output(string_identifier: str, output: str) -> List:
146 regexOutput = re.search(r'{stringIdentifier} = array1d\(\d+\.\.\d+, (\[.*?\])'.format(stringIdentifier=string_identifier), output)
147 return eval(regexOutput.group(1))
150def extract_multi_dim_array_from_output(string_identifier: str, dim: int, output: str, boolean=False) -> np.array:
151 dim_regex = r"1..(\d+), "
152 regex = r'{stringIdentifier} = array{dim}d\({dimsRegex}(\[.*?\])'.format(stringIdentifier=string_identifier, dim=dim,
153 dimsRegex=dim_regex*dim)
154 match = re.search(regex, output)
155 if match is None:
156 raise Exception("No match found for regex: %s" % regex)
157 shape = [int(match.group(i)) for i in range(1, dim+1)]
158 flat_list = match.group(dim+1)
159 if boolean:
160 flat_list = flat_list.replace("false", "0").replace("true", "1")
161 flat_list = eval(flat_list)
162 array1d = np.array(flat_list)
163 arraymd = array1d.reshape(shape)
164 return arraymd
167def array_to_mini_zinc(a: np.array, element_cast):
168 shape = a.shape
169 dims = ", ".join([f"1..{n}" for n in shape])
170 values = str(list(map(element_cast, a.flatten())))
171 return f"array{len(shape)}d({dims}, {values})"