Coverage for src/sensai/vectoriser.py: 0%

204 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-08-13 22:17 +0000

1from abc import ABC, abstractmethod 

2from enum import Enum 

3import logging 

4from typing import Callable, Union, TypeVar, Generic, Sequence, List, Tuple, Iterable, Dict, Hashable, Optional 

5 

6import numpy as np 

7 

8from .util import LogTime 

9from .util.pickle import setstate 

10from .util.string import list_string, ToStringMixin 

11 

12T = TypeVar("T") 

13 

14log = logging.getLogger(__name__) 

15 

16 

17class Vectoriser(Generic[T], ToStringMixin): 

18 """ 

19 A vectoriser represents a method for the conversion of instances of some type T into 

20 vectors, i.e. one-dimensional (numeric) arrays, or (in the special case of a 1D vector) scalars 

21 """ 

22 

23 log = log.getChild(__qualname__) 

24 

25 def __init__(self, f: Callable[[T], Union[float, np.ndarray, list]], transformer=None, is_fitted=False): 

26 """ 

27 :param f: the function which maps from an instance of T to an array/list/scalar 

28 :param transformer: an optional transformer (e.g. instance of one of the classes in sklearn.preprocessing) 

29 which can be used to transform/normalise the generated arrays 

30 :param is_fitted: whether the vectoriser (and therefore the given transformer) is assumed to be fitted already 

31 """ 

32 self._fn = f 

33 self.transformer = transformer 

34 self._resultType = None 

35 self.name = None 

36 self._is_fitted = is_fitted 

37 

38 def __setstate__(self, state): 

39 new_default_properties = { 

40 "_is_fitted": True # we assume that any persisted objects have been fitted 

41 } 

42 setstate(Vectoriser, self, state, new_optional_properties=["_resultType", "name"], renamed_properties={"f": "_fn"}, 

43 new_default_properties=new_default_properties) 

44 

45 def _tostring_exclude_private(self) -> bool: 

46 return True 

47 

48 def is_fitted(self) -> bool: 

49 return self._is_fitted 

50 

51 def set_name(self, name): 

52 self.name = name 

53 

54 def get_name(self): 

55 """ 

56 :return: the name of this feature generator, which may be a default name if the name has not been set. Note that feature generators 

57 created by a FeatureGeneratorFactory always get the name with which the generator factory was registered. 

58 """ 

59 if self.name is None: 

60 return f"{self.__class__.__name__}-{id(self)}" 

61 return self.name 

62 

63 def fit(self, items: Iterable[T]): 

64 if self.transformer is not None: 

65 values = [self._f(item) for item in items] 

66 self.transformer.fit(np.array(values)) 

67 self._is_fitted = True 

68 

69 def _f(self, x) -> np.array: 

70 y = self._fn(x) 

71 

72 if self._resultType is None: 

73 self._resultType = self.ResultType.from_value(y) 

74 

75 if self._resultType == self.ResultType.LIST: 

76 y = np.array(y) 

77 elif self._resultType == self.ResultType.SCALAR: 

78 y = np.array([y]) 

79 

80 return y 

81 

82 def apply(self, item: T, transform=True) -> np.array: 

83 """ 

84 :param item: the item to be vectorised 

85 :param transform: whether to apply this instance's transformer (if any) 

86 :return: a vector 

87 """ 

88 value = self._f(item) 

89 if self.transformer is not None and transform: 

90 value = self.transformer.transform([value])[0] 

91 return value 

92 

93 def apply_multi(self, items: Iterable[T], transform=True, use_cache=False, verbose=False) -> List[np.array]: 

94 """ 

95 Applies this vectoriser to multiple items at once. 

96 Especially for cases where this vectoriser uses a transformer, this method is significantly faster than 

97 calling apply repeatedly. 

98 

99 :param items: the items to be vectorised 

100 :param transform: whether to apply this instance's transformer (if any) 

101 :param use_cache: whether to apply caching of the value function f given at construction (keeping track of outputs for 

102 each input object id), which can significantly speed up computation in cases where an items appears more than 

103 once in the collection of items 

104 :param verbose: whether to generate log messages 

105 :return: a list of vectors 

106 """ 

107 if verbose: 

108 self.log.info(f"Applying {self}") 

109 

110 with LogTime("Application", enabled=verbose, logger=self.log): 

111 if not use_cache: 

112 compute_value = self._f 

113 else: 

114 cache = {} 

115 

116 def compute_value(x): 

117 key = id(x) 

118 value = cache.get(key) 

119 if value is None: 

120 value = self._f(x) 

121 cache[key] = value 

122 return value 

123 

124 values = [compute_value(x) for x in items] 

125 if self.transformer is not None and transform: 

126 values = self.transformer.transform(values) 

127 return values 

128 

129 class ResultType(Enum): 

130 SCALAR = 0 

131 LIST = 1 

132 NUMPY_ARRAY = 2 

133 

134 @classmethod 

135 def from_value(cls, y): 

136 if type(y) == list: 

137 return cls.LIST 

138 elif np.isscalar(y): 

139 return cls.SCALAR 

140 elif isinstance(y, np.ndarray): 

141 return cls.NUMPY_ARRAY 

142 else: 

143 raise ValueError(f"Received unhandled value of type {type(y)}") 

144 

145 

146class EmptyVectoriser(Vectoriser): 

147 def __init__(self): 

148 super().__init__(self._create_empty_vector) 

149 

150 # noinspection PyUnusedLocal 

151 @staticmethod 

152 def _create_empty_vector(x): 

153 return np.zeros(0) 

154 

155 

156class ItemIdentifierProvider(Generic[T], ABC): 

157 """ 

158 Provides identifiers for sequence items. 

159 """ 

160 @abstractmethod 

161 def get_identifier(self, item: T) -> Hashable: 

162 pass 

163 

164 

165class SequenceVectoriser(Generic[T], ToStringMixin): 

166 """ 

167 Supports the application of Vectorisers to sequences of objects of some type T, where each object of type T is 

168 mapped to a vector (1D array) by the vectorisers. 

169 A SequenceVectoriser is fitted by fitting the underlying Vectorisers. In order to obtain the instances of T that 

170 are used for training, we take into consideration the fact that the sequences of T may overlap and thus training 

171 is performed on the set of unique instances. 

172 """ 

173 

174 log = log.getChild(__qualname__) 

175 

176 class FittingMode(Enum): 

177 """ 

178 Determines how the individual vectorisers are fitted based on several sequences of objects of type T that are given. 

179 If NONE, no fitting is performed, otherwise the mode determines how a single sequence of objects of type T for fitting 

180 is obtained from the collection of sequences: either by forming the set of unique objects from the sequences (UNIQUE) 

181 """ 

182 NONE = "none" # no fitting is performed 

183 UNIQUE = "unique" # use collection of unique items 

184 CONCAT = "concat" # use collection obtained by concatenating all sequences using numpy.concatenate 

185 

186 def __init__(self, vectorisers: Union[Sequence[Vectoriser[T]], Vectoriser[T]], 

187 fitting_mode: FittingMode = FittingMode.UNIQUE, 

188 unique_id_provider: Optional[ItemIdentifierProvider] = None, 

189 refit_vectorisers: bool = True): 

190 """ 

191 :param vectorisers: zero or more vectorisers that are to be applied. If more than one vectoriser is supplied, 

192 vectors are generated from input instances of type T by concatenating the results of the vectorisers in 

193 the order the vectorisers are given. 

194 :param fitting_mode: the fitting mode for vectorisers. If `NONE`, no fitting takes place. 

195 If `UNIQUE`, fit vectorisers on unique set of items of type T. By default, uniqueness is determined based 

196 on Python object identity. If a custom mechanisms for determining an item's identity is desired, 

197 pass `unique_id_retriever`. 

198 If `CONCAT`, fit vectorisers based on all items of type T, concatenating them to a single sequence. 

199 :param unique_id_provider: an object used to determine item identities when using fitting mode `UNIQUE`. 

200 :param refit_vectorisers: whether any vectorisers that have previously been fitted shall be 

201 fitted once more when this sequence vectoriser is fitted. Set this to false if you are reusing vectorisers 

202 that are also part of another sequence vectoriser that will be fitted/has been fitted before this 

203 sequence vectoriser. This can be useful, in particular, in encoder-decoders where the target features 

204 are partly the same as the history sequence features, and we want to reuse the latter and their 

205 fitted transformers for the target features. 

206 """ 

207 self.fittingMode = fitting_mode 

208 self.uniqueIdProvider = unique_id_provider 

209 self.refitVectorisers = refit_vectorisers 

210 if isinstance(vectorisers, Vectoriser): 

211 self.vectorisers = [vectorisers] 

212 else: 

213 self.vectorisers = vectorisers 

214 if len(self.vectorisers) == 0: 

215 self.vectorisers = [EmptyVectoriser()] 

216 

217 def __setstate__(self, state): 

218 state["fittingMode"] = state.get("fittingMode", self.FittingMode.UNIQUE) 

219 setstate(SequenceVectoriser, self, state, new_optional_properties=["uniqueIdProvider"], 

220 new_default_properties={"refitVectorisers": True}) 

221 

222 def fit(self, data: Iterable[Sequence[T]]): 

223 log.debug(f"Fitting {self}") 

224 

225 if self.fittingMode == self.FittingMode.NONE: 

226 return 

227 

228 if not self.refitVectorisers: 

229 if all(v.is_fitted() for v in self.vectorisers): 

230 log.debug(f"No vectorisers to be fitted; all contained vectorisers have previously been fitted") 

231 return 

232 

233 # obtain items for fitting 

234 if self.fittingMode == self.FittingMode.UNIQUE: 

235 if self.uniqueIdProvider is None: 

236 items = set() 

237 for seq in data: 

238 items.update(seq) 

239 else: 

240 items = [] 

241 identifiers = set() 

242 for seq in data: 

243 for item in seq: 

244 identifier = self.uniqueIdProvider.get_identifier(item) 

245 if identifier not in identifiers: 

246 identifiers.add(identifier) 

247 items.append(item) 

248 elif self.fittingMode == self.FittingMode.CONCAT: 

249 items = np.concatenate(data) # type: ignore 

250 else: 

251 raise ValueError(self.fittingMode) 

252 

253 for v in self.vectorisers: 

254 if self.refitVectorisers or not v.is_fitted(): 

255 log.debug(f"Fitting {v}") 

256 v.fit(items) 

257 

258 def apply(self, seq: Sequence[T], transform=True) -> List[np.array]: 

259 """ 

260 Applies vectorisation to the given sequence of objects 

261 

262 :param seq: the sequence to vectorise 

263 :param transform: whether to apply any post-vectorisation transformers 

264 :return: 

265 """ 

266 vectors_list = [] 

267 for item in seq: 

268 vectors = [vec.apply(item, transform=transform) for vec in self.vectorisers] 

269 conc = np.concatenate(vectors, axis=0) 

270 vectors_list.append(conc) 

271 return vectors_list 

272 

273 def apply_multi(self, sequences: Iterable[Sequence[T]], use_cache=False, verbose=False) -> Tuple[List[List[np.array]], List[int]]: 

274 """ 

275 Applies this vectoriser to multiple sequences of objects of type T, where each sequence is mapped to a sequence 

276 of 1D arrays. 

277 This method can be significantly faster than multiple applications of apply, especially in cases where the vectorisers 

278 use transformers. 

279 

280 :param sequences: the sequences to vectorise 

281 :param use_cache: whether to apply caching of the value functions of contained vectorisers (keeping track of outputs for 

282 each input object id), which can significantly speed up computation in cases where the given sequences contain individual 

283 items more than once 

284 :param verbose: whether to generate log messages 

285 :return: a pair (vl, l) where vl is a list of lists of vectors/arrays and l is a list of integers containing the lengths 

286 of the sequences 

287 """ 

288 if verbose: 

289 self.log.info(f"Applying {self} (useCache={use_cache})") 

290 

291 lengths = [len(s) for s in sequences] 

292 

293 if verbose: 

294 self.log.info("Generating combined sequence") 

295 combined_seq = [] 

296 for seq in sequences: 

297 combined_seq.extend(seq) 

298 

299 individual_vectoriser_results = [vectoriser.apply_multi(combined_seq, use_cache=use_cache, verbose=verbose) 

300 for vectoriser in self.vectorisers] 

301 conc_vectors = [np.concatenate(x, axis=0) for x in zip(*individual_vectoriser_results)] 

302 

303 vector_sequences = [] 

304 idx_start = 0 

305 for l in lengths: 

306 vector_sequences.append(conc_vectors[idx_start:idx_start+l]) 

307 idx_start += l 

308 

309 return vector_sequences, lengths 

310 

311 def apply_multi_with_padding(self, sequences: Sequence[Sequence[T]], use_cache=False, verbose=False) \ 

312 -> Tuple[List[List[np.array]], List[int]]: 

313 """ 

314 Applies this vectoriser to multiple sequences of objects of type T, where each sequence is mapped to a sequence 

315 of 1D arrays. 

316 Sequences are allowed to vary in length. for shorter sequences, 0-vectors are appended until the maximum sequence length 

317 is reached (padding). 

318 

319 :param sequences: the sequences to vectorise 

320 :param use_cache: whether to apply caching of the value functions of contained vectorisers (keeping track of outputs for 

321 each input object id), which can significantly speed up computation in cases where the given sequences contain individual 

322 items more than once 

323 :param verbose: whether to generate log messages 

324 :return: a pair (vl, l) where vl is a list of lists of vectors/arrays, each list having the same length, and l is a list of 

325 integers containing the original unpadded lengths of the sequences 

326 """ 

327 result, lengths = self.apply_multi(sequences, use_cache=use_cache, verbose=verbose) 

328 if verbose: 

329 self.log.info("Applying padding") 

330 max_length = max(lengths) 

331 dim = len(result[0][0]) 

332 dummy_vec = np.zeros((dim,)) 

333 for seq in result: 

334 for i in range(max_length - len(seq)): 

335 seq.append(dummy_vec) 

336 return result, lengths 

337 

338 def get_vector_dim(self, seq: Sequence[T]): 

339 """ 

340 Determines the dimensionality of generated vectors by applying the vectoriser to the given sequence 

341 

342 :param seq: the sequence 

343 :return: the number of dimensions in generated output vectors (per item) 

344 """ 

345 return len(self.apply(seq, transform=False)[0]) 

346 

347 

348class VectoriserRegistry: 

349 def __init__(self): 

350 self._factories: Dict[Hashable, Callable[[Callable], Vectoriser]] = {} 

351 

352 def get_available_vectorisers(self): 

353 return list(self._factories.keys()) 

354 

355 @staticmethod 

356 def _name(name: Hashable): 

357 # for enums, which have .name, use the name only, because it is less problematic to persist 

358 if hasattr(name, "name"): 

359 name = name.name 

360 return name 

361 

362 def register_factory(self, name: Hashable, factory: Callable[[Callable], Vectoriser], 

363 additional_names: Optional[Iterable[Hashable]] = None): 

364 """ 

365 Registers a vectoriser factory which can subsequently be referenced via their name 

366 

367 :param name: the name (which can, in particular, be a string or an enum item) 

368 :param factory: the factory, which takes the default transformer factory as an argument 

369 :param additional_names: (optional) additional names under which to register the factory 

370 """ 

371 self._register_factory(name, factory) 

372 if additional_names is not None: 

373 for n in additional_names: 

374 self._register_factory(n, factory) 

375 

376 def _register_factory(self, name: Hashable, factory): 

377 name = self._name(name) 

378 if name in self._factories: 

379 raise ValueError(f"Vectoriser factory for name '{name}' already registered") 

380 self._factories[name] = factory 

381 

382 def get_vectoriser(self, name: Hashable, default_transformer_factory: Callable) -> Vectoriser: 

383 """ 

384 Creates a vectoriser from a name, which must have been previously registered. 

385 

386 :param name: the name (which can, in particular, be a string or an enum item) 

387 :param default_transformer_factory: the default transformer factory 

388 :return: a new vectoriser instance 

389 """ 

390 name = self._name(name) 

391 factory = self._factories.get(name) 

392 if factory is None: 

393 raise ValueError(f"No vectoriser factory registered for name '{name}': known names: {list_string(self._factories.keys())}. " 

394 f"Register the factory first.") 

395 instance = factory(default_transformer_factory) 

396 instance.set_name(name) 

397 return instance 

398 

399 def get_vectorisers(self, names: List[Hashable], default_transformer_factory: Callable) -> List[Vectoriser]: 

400 return [self.get_vectoriser(name, default_transformer_factory) for name in names]