Coverage for src/sensai/minizinc.py: 0%

102 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-08-13 22:17 +0000

1from abc import ABC, abstractmethod 

2import logging 

3import math 

4import os 

5import re 

6import subprocess 

7import tempfile 

8import time 

9from typing import List 

10 

11import numpy as np 

12 

13 

14log = logging.getLogger(__name__) 

15 

16 

17class CostScaler: 

18 """ 

19 Serves to scale floats and converts them into integers (and vice versa) whilst 

20 maintaining decimal precision 

21 """ 

22 

23 def __init__(self, cost_values: List[float], significant_digits: int): 

24 """ 

25 Parameters: 

26 cost_values: the sequence of cost values whose precision should be maintained in the int realm 

27 significant_digits: the number of significant digits that shall at least be maintained 

28 """ 

29 exp10 = significant_digits - 1 - min([0] + [np.floor(np.log10(v)) for v in cost_values]) 

30 self.scalingFactor = math.pow(10, exp10) 

31 

32 def scaled_int(self, original_value: float) -> int: 

33 """Returns the scaled value as an integer""" 

34 return int(round(original_value * self.scalingFactor)) 

35 

36 def scaled_float(self, original_value: float) -> float: 

37 return original_value * self.scalingFactor 

38 

39 def original_value(self, scaled_value: float) -> float: 

40 """Returns the original unscaled value from a scaled value""" 

41 return scaled_value / self.scalingFactor 

42 

43 def __str__(self): 

44 return "CostScaler[factor=%d]" % self.scalingFactor 

45 

46 

47class MiniZincProblem(ABC): 

48 

49 def create_mini_zinc_file(self, f): 

50 """ 

51 Writes MiniZinc code 

52 

53 :param f: an OS-level handle to an open file 

54 """ 

55 os.write(f, bytes(self.get_mini_zinc_code(), 'utf-8')) 

56 

57 @abstractmethod 

58 def get_mini_zinc_code(self): 

59 pass 

60 

61 

62class MiniZincSolver(object): 

63 log = log.getChild(__qualname__) 

64 

65 def __init__(self, name='OSICBC', solver_time_seconds=None, fzn_output_path=None): 

66 """ 

67 :param name: name of solver compatible with miniZinc 

68 :param solver_time_seconds: upper time limit for solver in seconds 

69 :param fzn_output_path: flatZinc output path 

70 """ 

71 self.solver_name = name 

72 self.solver_time_limit_secs = solver_time_seconds 

73 self.fzn_output_path = fzn_output_path 

74 self.last_solver_time_secs = None 

75 self.last_solver_output = None 

76 self.lastSolverErrOutput = None 

77 

78 def __str(self): 

79 return f"MiniZincSolver[{self.solver_name}]" 

80 

81 def solve_path(self, mzn_path: str, log_info=True) -> str: 

82 """ 

83 Solves the MiniZinc problem stored at the given file path 

84 

85 :param mzn_path: path to file containing MiniZinc problem code 

86 :param log_info: whether to log solver output at INFO level rather than DEBUG level 

87 :return: the solver output 

88 """ 

89 self.last_solver_time_secs = None 

90 log_solver = self.log.info if log_info else self.log.debug 

91 

92 args = ["--statistics", "--solver", self.solver_name] 

93 if self.solver_time_limit_secs is not None: 

94 args.append("--time-limit") 

95 args.append(str(self.solver_time_limit_secs * 1000)) 

96 if self.fzn_output_path is not None: 

97 args.append("--output-fzn-to-file") 

98 args.append(self.fzn_output_path) 

99 args.append(mzn_path) 

100 command = "minizinc " + " ".join(args) 

101 

102 self.log.info("Running %s" % command) 

103 start_time = time.time() 

104 proc = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) 

105 output = [] 

106 while True: 

107 line = proc.stdout.readline().decode("utf-8") 

108 if not line: 

109 break 

110 output.append(line) 

111 log_solver("Solver output: %s" % line.rstrip()) 

112 output = "".join(output) 

113 proc.wait() 

114 if proc.returncode != 0: 

115 raise Exception(f"MiniZinc call failed with return code {proc.returncode}; output: {output}") 

116 self.last_solver_time_secs = time.time() - start_time 

117 self.last_solver_output = output 

118 self.log.info("Solver time: %.1fs" % self.last_solver_time_secs) 

119 return output 

120 

121 def solve_problem(self, problem: MiniZincProblem, keep_temp_file=False, log_info=True) -> str: 

122 """ 

123 Solves the given MiniZinc problem 

124 

125 :param problem: the problem to solve 

126 :param keep_temp_file: whether to keep the temporary .mzv file 

127 :param log_info: whether to log solver output at INFO level rather than DEBUG level 

128 :return: the solver output 

129 """ 

130 f, path = tempfile.mkstemp(".mzn") 

131 try: 

132 try: 

133 problem.create_mini_zinc_file(f) 

134 finally: 

135 os.close(f) 

136 return self.solve_path(path, log_info=log_info) 

137 finally: 

138 if not keep_temp_file: 

139 os.unlink(path) 

140 

141 def get_last_solver_time_secs(self): 

142 return self.last_solver_time_secs 

143 

144 

145def extract_1d_array_from_output(string_identifier: str, output: str) -> List: 

146 regexOutput = re.search(r'{stringIdentifier} = array1d\(\d+\.\.\d+, (\[.*?\])'.format(stringIdentifier=string_identifier), output) 

147 return eval(regexOutput.group(1)) 

148 

149 

150def extract_multi_dim_array_from_output(string_identifier: str, dim: int, output: str, boolean=False) -> np.array: 

151 dim_regex = r"1..(\d+), " 

152 regex = r'{stringIdentifier} = array{dim}d\({dimsRegex}(\[.*?\])'.format(stringIdentifier=string_identifier, dim=dim, 

153 dimsRegex=dim_regex*dim) 

154 match = re.search(regex, output) 

155 if match is None: 

156 raise Exception("No match found for regex: %s" % regex) 

157 shape = [int(match.group(i)) for i in range(1, dim+1)] 

158 flat_list = match.group(dim+1) 

159 if boolean: 

160 flat_list = flat_list.replace("false", "0").replace("true", "1") 

161 flat_list = eval(flat_list) 

162 array1d = np.array(flat_list) 

163 arraymd = array1d.reshape(shape) 

164 return arraymd 

165 

166 

167def array_to_mini_zinc(a: np.array, element_cast): 

168 shape = a.shape 

169 dims = ", ".join([f"1..{n}" for n in shape]) 

170 values = str(list(map(element_cast, a.flatten()))) 

171 return f"array{len(shape)}d({dims}, {values})"