Coverage for src/sensai/util/cache_azure.py: 0%

407 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-11-29 18:29 +0000

1from __future__ import annotations 

2import collections 

3import functools 

4import pickle 

5import sys 

6from abc import ABC, abstractmethod 

7from concurrent.futures.thread import ThreadPoolExecutor 

8from typing import Callable, Dict, Union, Any, List, Sequence, Optional 

9import json 

10import logging 

11import re 

12import threading 

13 

14 

15from azure.storage.table import TableService, TableBatch, Entity 

16from azure.storage.blob import BlockBlobService 

17import pandas as pd 

18import numpy as np 

19 

20from .cache import PersistentKeyValueCache, PeriodicUpdateHook 

21 

22_log = logging.getLogger(__name__) 

23 

24 

25class Serialiser(ABC): 

26 """ 

27 Abstraction for mechanisms to serialise values, which do not fit table storage data model, 

28 see https://docs.microsoft.com/en-us/rest/api/storageservices/understanding-the-table-service-data-model 

29 """ 

30 

31 @abstractmethod 

32 def serialise(self, value) -> str: 

33 pass 

34 

35 @abstractmethod 

36 def deserialise(self, value: str): 

37 pass 

38 

39 

40class NumpyArrayJsonSerialiser(Serialiser): 

41 """ 

42 Serialises a numpy array as json string of list representation of array 

43 """ 

44 

45 def serialise(self, value: np.ndarray) -> str: 

46 return json.dumps(value.tolist()) 

47 

48 def deserialise(self, value: str): 

49 return np.array(json.loads(value)) 

50 

51 

52class PropertyLoader(ABC): 

53 """ 

54 Abstraction of a customised loader for an entity property 

55 """ 

56 

57 @abstractmethod 

58 def load_property_value(self, entity: Entity): 

59 pass 

60 

61 @abstractmethod 

62 def write_property_value(self, entity: Entity): 

63 pass 

64 

65 @abstractmethod 

66 def load_property_value_to_data_frame_column(self, df: pd.DataFrame): 

67 pass 

68 

69 

70class SerialisedPropertyLoader(PropertyLoader): 

71 """ 

72 PropertyLoader to serialise and de-serialise values. Useful, if type of values is not aligned with table storage data model, 

73 see https://docs.microsoft.com/en-us/rest/api/storageservices/understanding-the-table-service-data-model 

74 """ 

75 def __init__(self, property_name: str, serialiser: Serialiser): 

76 self.serialiser = serialiser 

77 self.property_name = property_name 

78 

79 def load_property_value(self, entity: Entity): 

80 entity[self.property_name] = self.serialiser.deserialise(entity[self.property_name]) 

81 

82 def write_property_value(self, entity: Entity): 

83 entity[self.property_name] = self.serialiser.serialise(entity[self.property_name]) 

84 

85 def load_property_value_to_data_frame_column(self, df: pd.DataFrame): 

86 if self.property_name in df.columns: 

87 df.loc[:, self.property_name] = [self.serialiser.deserialise(value) for value in df[self.property_name]] 

88 

89 

90class AzureTableBlobBackend(ABC): 

91 """ 

92 Abstraction of a blob backend, which allows for convenient setting and getting of values stored in blob storage via a 

93 reference to the value 

94 """ 

95 

96 @abstractmethod 

97 def get_value_from_reference(self, value_identifier: str): 

98 pass 

99 

100 @abstractmethod 

101 def get_value_reference(self, partition_key: str, row_key: str, value_name: str, blob_name_prefix: str = None) -> str: 

102 pass 

103 

104 @abstractmethod 

105 def set_value_for_reference(self, value_identifier: str, value): 

106 pass 

107 

108 

109class BlobPerKeyAzureTableBlobBackend(AzureTableBlobBackend, ABC): 

110 

111 """ 

112 Backend stores serialised values as /tableName/partitionKey/rowKey/valueName.<fileExtension> 

113 or /tableName/rowKey/valueName.<fileExtension>, if partitionKey equals tableName 

114 """ 

115 

116 def __init__(self, block_blob_service: BlockBlobService, container_name: str): 

117 """ 

118 

119 :param block_blob_service: https://docs.microsoft.com/en-us/python/api/azure-storage-blob/azure.storage.blob.blockblobservice.blockblobservice?view=azure-python-previous 

120 """ 

121 self.block_blob_service = block_blob_service 

122 self.container_name = container_name 

123 self.container_list = [container.name for container in block_blob_service.list_containers()] 

124 if container_name not in self.container_list: 

125 self.block_blob_service.create_container(container_name) 

126 self.container_list.append(container_name) 

127 

128 @property 

129 @abstractmethod 

130 def file_extension(self): 

131 pass 

132 

133 @abstractmethod 

134 def _get_blob_value(self, container_name, blob_name): 

135 pass 

136 

137 @abstractmethod 

138 def _write_value_to_blob(self, container_name, blob_name, value): 

139 pass 

140 

141 def get_value_from_reference(self, value_identifier: str): 

142 container_name = self._get_container_name_from_identifier(value_identifier) 

143 blob_name = self._get_blob_name_from_identifier(value_identifier) 

144 return self._get_blob_value(container_name, blob_name) 

145 

146 def get_value_reference(self, partition_key: str, row_key: str, value_name: str, blob_name_prefix: str = None) -> str: 

147 blob_name = self._get_blob_name_from_keys(partition_key, row_key, value_name, blob_prefix=blob_name_prefix) 

148 return self.block_blob_service.make_blob_url(self.container_name, blob_name) 

149 

150 def set_value_for_reference(self, value_identifier: str, value): 

151 container_name = self._get_container_name_from_identifier(value_identifier) 

152 blob_name = self._get_blob_name_from_identifier(value_identifier) 

153 self._write_value_to_blob(container_name, blob_name, value) 

154 

155 def _get_blob_name_from_identifier(self, value_identifier: str): 

156 return (value_identifier.partition(f"{self.block_blob_service.primary_endpoint}/")[2]).partition("/")[2] 

157 

158 def _get_container_name_from_identifier(self, value_identifier: str): 

159 return (value_identifier.partition(f"{self.block_blob_service.primary_endpoint}/")[2]).partition("/")[0] 

160 

161 def _get_blob_name_from_keys(self, partition_key: str, row_key: str, value_name: str, blob_prefix: str = None): 

162 identifier_list = [blob_prefix, partition_key] if blob_prefix is not None and blob_prefix != partition_key else [partition_key] 

163 identifier_list.extend([row_key, value_name]) 

164 return "/".join(identifier_list) + self.file_extension 

165 

166 

167class TextDumpAzureTableBlobBackend(BlobPerKeyAzureTableBlobBackend): 

168 """ 

169 Backend stores values as txt files in the structure /tableName/partitionKey/rowKey/valueName 

170 """ 

171 

172 @property 

173 def file_extension(self): 

174 return "" 

175 

176 def _get_blob_value(self, container_name, blob_name): 

177 return self.block_blob_service.get_blob_to_text(container_name, blob_name).content 

178 

179 def _write_value_to_blob(self, container_name, blob_name, value): 

180 self.block_blob_service.create_blob_from_text(container_name, blob_name, value) 

181 

182 

183class JsonAzureTableBlobBackend(BlobPerKeyAzureTableBlobBackend): 

184 """ 

185 Backend stores values as json files in the structure /tableName/partitionKey/rowKey/valueName.json 

186 """ 

187 

188 @property 

189 def file_extension(self): 

190 return ".json" 

191 

192 def _get_blob_value(self, container_name, blob_name): 

193 encoded_value = self.block_blob_service.get_blob_to_bytes(container_name, blob_name).content 

194 return self._decode_bytes_to_value(encoded_value) 

195 

196 def _write_value_to_blob(self, container_name, blob_name, value): 

197 encoded_value = self._encode_value_to_bytes(value) 

198 self.block_blob_service.create_blob_from_bytes(container_name, blob_name, encoded_value) 

199 

200 @staticmethod 

201 def _encode_value_to_bytes(value): 

202 return str.encode(json.dumps(value)) 

203 

204 @staticmethod 

205 def _decode_bytes_to_value(_bytes): 

206 return json.loads(_bytes.decode()) 

207 

208 

209class PickleAzureTableBlobBackend(JsonAzureTableBlobBackend): 

210 """ 

211 Backend stores values as pickle files in the structure /tableName/partitionKey/rowKey/valueName.pickle 

212 """ 

213 

214 @property 

215 def file_extension(self): 

216 return ".pickle" 

217 

218 @staticmethod 

219 def _encode_value_to_bytes(value): 

220 return pickle.dumps(value) 

221 

222 @staticmethod 

223 def _decode_bytes_to_value(_bytes): 

224 return pickle.loads(_bytes) 

225 

226 

227class BlobBackedPropertyLoader(PropertyLoader): 

228 AZURE_ALLOWED_SIZE_PER_PROPERTY_BYTES = 64000 

229 AZURE_ALLOWED_STRING_LENGTH_PER_PROPERTY = 32000 

230 

231 """ 

232 PropertyLoader to write and read values from blob backend via a reference to the value. Useful, if values cannot 

233 be stored in table storage itself, due to not being aligned with table storage data model,  

234 see https://docs.microsoft.com/en-us/rest/api/storageservices/understanding-the-table-service-data-model 

235 """ 

236 def __init__(self, property_name: str, blob_backend: AzureTableBlobBackend, blob_prefix: str = None, 

237 property_boolean_blob_status_name: str = None, max_workers=None): 

238 """ 

239 :param property_name: name of property in table 

240 :param property_boolean_blob_status_name: name of property representing a boolean flag within a table, which indicates, if value is 

241 blob backed. If None, each value is assumed to be blob backed. 

242 :param blob_backend: actual backend to use for storage 

243 :param blob_prefix: prefix to use for blob in storage, e.g. a table name 

244 :param max_workers: maximal number of workers to load data from blob storage 

245 """ 

246 self.blob_prefix = blob_prefix 

247 self.property_blob_status_name = property_boolean_blob_status_name 

248 self.blob_backend = blob_backend 

249 self.max_workers = max_workers 

250 self.propertyName = property_name 

251 

252 def load_property_value(self, entity: Entity): 

253 if self._is_entity_value_blob_backed(entity): 

254 entity[self.propertyName] = self.blob_backend.get_value_from_reference(entity[self.propertyName]) 

255 

256 def write_property_value(self, entity: Entity): 

257 if self.propertyName in entity.keys(): 

258 if self._need_to_write_to_blob(entity[self.propertyName]): 

259 value_identifier = self.blob_backend.get_value_reference(entity["PartitionKey"], entity["RowKey"], self.propertyName, 

260 blob_name_prefix=self.blob_prefix) 

261 value = entity[self.propertyName] 

262 self.blob_backend.set_value_for_reference(value_identifier, value) 

263 entity[self.propertyName] = value_identifier 

264 property_blob_status = True if self.property_blob_status_name is not None else None 

265 else: 

266 property_blob_status = False if self.property_blob_status_name is not None else None 

267 

268 if property_blob_status is not None: 

269 entity[self.property_blob_status_name] = property_blob_status 

270 

271 def load_property_value_to_data_frame_column(self, df: pd.DataFrame): 

272 if self.propertyName in df.columns: 

273 if self.property_blob_status_name is None: 

274 df.loc[:, self.propertyName] = self._load_values_in_series(df[self.propertyName]) 

275 else: 

276 df.loc[df[self.property_blob_status_name], self.propertyName] = \ 

277 self._load_values_in_series(df.loc[df[self.property_blob_status_name], self.propertyName]) 

278 

279 def _need_to_write_to_blob(self, value): 

280 if self.property_blob_status_name is None: 

281 return True 

282 if sys.getsizeof(value) > self.AZURE_ALLOWED_SIZE_PER_PROPERTY_BYTES: 

283 return True 

284 if isinstance(value, str) and len(value) > self.AZURE_ALLOWED_STRING_LENGTH_PER_PROPERTY: 

285 return True 

286 return False 

287 

288 def _is_entity_value_blob_backed(self, entity: Entity): 

289 if self.propertyName not in entity.keys(): 

290 return False 

291 if self.property_blob_status_name is None or self.property_blob_status_name not in entity: 

292 return True 

293 return entity[self.property_blob_status_name] 

294 

295 def _load_values_in_series(self, _series: pd.Series): 

296 with ThreadPoolExecutor(max_workers=self.max_workers) as executor: 

297 _series = list(executor.map(self.blob_backend.get_value_from_reference, _series)) 

298 return _series 

299 

300 

301class BlobBackedSerialisedPropertyLoader(BlobBackedPropertyLoader, SerialisedPropertyLoader): 

302 """ 

303 Property loader, which combines serialisation and blob backing. 

304 """ 

305 def __init__(self, property_name, serialiser: Serialiser, blob_backend: AzureTableBlobBackend, blob_prefix: str = None, 

306 property_boolean_blob_status_name: str = None, max_workers=None): 

307 """ 

308 

309 

310 :param property_name: name of property in table 

311 :param serialiser: 

312 :param property_boolean_blob_status_name: name of property representing a boolean flag within a table, which indicates, if value is 

313 blob backed. If None, each value is assumed to be blob backed. 

314 :param blob_backend: actual backend to use for storage 

315 :param blob_prefix: prefix to use for blob in storage, e.g. a table name 

316 :param max_workers: maximal number of workers to load data from blob storage 

317 """ 

318 SerialisedPropertyLoader.__init__(self, property_name, serialiser) 

319 BlobBackedPropertyLoader.__init__(self, property_name, blob_backend, blob_prefix, property_boolean_blob_status_name, max_workers) 

320 

321 def load_property_value(self, entity: Entity): 

322 super(BlobBackedPropertyLoader, self).load_property_value(entity) 

323 super(SerialisedPropertyLoader, self).load_property_value(entity) 

324 

325 def write_property_value(self, entity: Entity): 

326 super(SerialisedPropertyLoader, self).write_property_value(entity) 

327 super(BlobBackedPropertyLoader, self).write_property_value(entity) 

328 

329 def load_property_value_to_data_frame_column(self, df: pd.DataFrame): 

330 super(BlobBackedPropertyLoader, self).load_property_value_to_data_frame_column(df) 

331 super(SerialisedPropertyLoader, self).load_property_value_to_data_frame_column(df) 

332 

333 

334class AzureLazyBatchCommitTable: 

335 """ 

336 Wrapper for an Azure table, which allow for convenient insertion via lazy batch execution per partition. 

337 Uses a priority queue to manage order of partitions to be committed. 

338 To execute insertions, call :func:`LazyBatchCommitTable.commit` 

339 """ 

340 

341 AZURE_ALLOWED_TABLE_NAME_PATTERN = re.compile("^[A-Za-z][A-Za-z0-9]{2,62}$") 

342 AZURE_ALLOWED_TABLE_BATCH_SIZE = 100 

343 

344 class PartitionCommandsPriorityQueue: 

345 

346 class PartitionCommands: 

347 def __init__(self, partition_key): 

348 self.partition_key = partition_key 

349 self._command_list = collections.deque() 

350 

351 def __len__(self): 

352 return len(self._command_list) 

353 

354 def append(self, command): 

355 self._command_list.append(command) 

356 

357 def execute(self, context_manager: Callable[[], TableBatch], batch_size: int): 

358 while len(self._command_list) > 0: 

359 _slice = [self._command_list.popleft() for _ in range(min(batch_size, len(self._command_list)))] 

360 _log.info(f"Committing {len(_slice)} cache entries to the database") 

361 with context_manager() as batch: 

362 for command in _slice: 

363 command(batch) 

364 

365 def __init__(self): 

366 self.partition_commands_queue = [] 

367 self.partition_key2_commands = {} 

368 self._thread_lock = threading.Lock() 

369 

370 def add_command(self, partition_key, command: Union[Callable[[TableBatch], Any], functools.partial[TableBatch]]): 

371 """ 

372 Add a command to queue of corresponding partitionKey 

373 :param partition_key: 

374 :param command: a callable on a TableBatch 

375 """ 

376 with self._thread_lock: 

377 if partition_key not in self.partition_key2_commands: 

378 commands = self.PartitionCommands(partition_key) 

379 self.partition_commands_queue.append(commands) 

380 self.partition_key2_commands[partition_key] = commands 

381 self.partition_key2_commands[partition_key].append(command) 

382 

383 def pop(self, min_length: int = None) -> Optional[AzureLazyBatchCommitTable.PartitionCommandsPriorityQueue.PartitionCommands]: 

384 """ 

385 :param min_length: minimal length of largest PartitionCommands for the pop to take place. 

386 :return: largest PartitionCommands or None if minimal length is not reached 

387 """ 

388 with self._thread_lock: 

389 return self._pop(min_length) 

390 

391 def pop_all(self): 

392 with self._thread_lock: 

393 commands_list = [] 

394 while not self._is_empty(): 

395 commands_list.append(self._pop()) 

396 return commands_list 

397 

398 def is_empty(self): 

399 with self._thread_lock: 

400 return self._is_empty() 

401 

402 def _pop(self, min_length=None): 

403 length, index = self._get_max_priority_info() 

404 if index is not None and (min_length is None or length >= min_length): 

405 q = self.partition_commands_queue.pop(index) 

406 del self.partition_key2_commands[q.partition_key] 

407 return q 

408 else: 

409 return None 

410 

411 def _is_empty(self): 

412 return len(self.partition_commands_queue) == 0 

413 

414 def _get_max_priority_info(self): 

415 lengths_list = list(map(len, self.partition_commands_queue)) 

416 if len(lengths_list) == 0: 

417 return 0, None 

418 max_length = max(lengths_list) 

419 return max_length, lengths_list.index(max_length) 

420 

421 def __init__(self, table_name: str, table_service: TableService, property_loaders: Sequence[PropertyLoader] = ()): 

422 """ 

423 :param table_name: name of table 

424 :param table_service: instance of :class:`azure.storage.table.TableService` to connect to Azure table storage 

425 :param property_loaders: 

426 """ 

427 

428 if not self.AZURE_ALLOWED_TABLE_NAME_PATTERN.match(table_name): 

429 raise ValueError(f"Invalid table name {table_name}, see: " 

430 f"https://docs.microsoft.com/en-us/rest/api/storageservices/Understanding-the-Table-Service-Data-Model") 

431 

432 self.table_service = table_service 

433 self.table_name = table_name 

434 self.property_loaders = property_loaders 

435 self._partition_queues = self.PartitionCommandsPriorityQueue() 

436 self._context_manager = functools.partial(self.table_service.batch, self.table_name) 

437 

438 if not self.exists(): 

439 self.table_service.create_table(self.table_name) 

440 

441 def insert_or_replace_entity(self, entity: Union[Dict, Entity]): 

442 """ 

443 Lazy wrapper method for :func:`azure.storage.table.TableService.insert_or_replace_entity` 

444 :param entity: 

445 """ 

446 partition_key = entity["PartitionKey"] 

447 for property_loader in self.property_loaders: 

448 property_loader.write_property_value(entity) 

449 execution_command = functools.partial(self._insert_or_replace_entity_via_batch, entity) 

450 self._partition_queues.add_command(partition_key, execution_command) 

451 

452 def insert_entity(self, entity: Union[Dict, Entity]): 

453 """ 

454 Lazy wrapper method for :func:`azure.storage.table.TableService.insert_entity` 

455 :param entity: 

456 """ 

457 partition_key = entity["PartitionKey"] 

458 for property_loader in self.property_loaders: 

459 property_loader.write_property_value(entity) 

460 execution_command = functools.partial(self._insert_entity_via_batch, entity) 

461 self._partition_queues.add_command(partition_key, execution_command) 

462 

463 def get_entity(self, partition_key: str, row_key: str) -> Optional[Entity]: 

464 """ 

465 Wraps :func:`azure.storage.table.TableService.get_entity` 

466 :param partition_key: 

467 :param row_key: 

468 :return: 

469 """ 

470 try: 

471 entity = self.table_service.get_entity(self.table_name, partition_key, row_key) 

472 for property_loader in self.property_loaders: 

473 property_loader.load_property_value(entity) 

474 return entity 

475 except Exception as e: 

476 _log.debug(f"Unable to load value for partitionKey {partition_key} and rowKey {row_key} from table {self.table_name}: {e}") 

477 return None 

478 

479 def commit_blocking_until_empty(self, max_batch_size=AZURE_ALLOWED_TABLE_BATCH_SIZE): 

480 """ 

481 Commit insertion commands. Commands are executed batch-wise per partition until partition queue is empty in a 

482 blocking manner. 

483 :param max_batch_size: maximal batch size to use for batch insertion, must be less or equal to batch size allowed by Azure 

484 """ 

485 

486 max_batch_size = self._validate_max_batch_size(max_batch_size) 

487 

488 while not self._partition_queues.is_empty(): 

489 commands = self._partition_queues.pop() 

490 commands.execute(self._context_manager, max_batch_size) 

491 

492 def commit_non_blocking_current_queue_state(self, max_batch_size=AZURE_ALLOWED_TABLE_BATCH_SIZE): 

493 """ 

494 Commit insertion commands. Empties the current PartitionCommandsQueue in a non blocking way. 

495 Commands are executed batch-wise per partition. 

496 :param max_batch_size: maximal batch size to use for batch insertion, must be less or equal to batch size allowed by Azure 

497 """ 

498 

499 max_batch_size = self._validate_max_batch_size(max_batch_size) 

500 

501 def commit(): 

502 commands_list = self._partition_queues.pop_all() 

503 for commands in commands_list: 

504 commands.execute(self._context_manager, max_batch_size) 

505 

506 thread = threading.Thread(target=commit, daemon=False) 

507 thread.start() 

508 

509 def commit_blocking_largest_partition_from_queue(self, max_batch_size=AZURE_ALLOWED_TABLE_BATCH_SIZE, min_length=None): 

510 """ 

511 Commits in a blocking way the largest partition from PartitionCommandsQueue 

512 :param max_batch_size: maximal batch size to use for batch insertion, must be less or equal to batch size allowed by Azure 

513 :param min_length: minimal size of largest partition. If not None, pop and commit only if minLength is reached. 

514 :return: 

515 """ 

516 max_batch_size = self._validate_max_batch_size(max_batch_size) 

517 commands = self._partition_queues.pop(min_length) 

518 if commands is not None: 

519 commands.execute(self._context_manager, max_batch_size) 

520 

521 def _validate_max_batch_size(self, max_batch_size): 

522 if max_batch_size > self.AZURE_ALLOWED_TABLE_BATCH_SIZE: 

523 _log.warning(f"Provided maxBatchSize is larger than allowed size {self.AZURE_ALLOWED_TABLE_BATCH_SIZE}. " 

524 f"Will use maxBatchSize {self.AZURE_ALLOWED_TABLE_BATCH_SIZE} instead.") 

525 max_batch_size = self.AZURE_ALLOWED_TABLE_BATCH_SIZE 

526 return max_batch_size 

527 

528 def load_table_to_data_frame(self, columns: List[str] = None, row_filter_query: str = None, num_records: int = None): 

529 """ 

530 Load all rows of table to :class:`~pandas.DataFrame` 

531 :param row_filter_query: 

532 :param num_records: 

533 :param columns: restrict loading to provided columns 

534 :return: :class:`~pandas.DataFrame` 

535 """ 

536 if num_records is None: 

537 records = list(self._iter_records(columns, row_filter_query)) 

538 else: 

539 records = [] 

540 for record in self._iter_records(columns, row_filter_query): 

541 records.append(record) 

542 if len(records) >= num_records: 

543 break 

544 df = pd.DataFrame(records, columns=columns) 

545 for property_loader in self.property_loaders: 

546 property_loader.load_property_value_to_data_frame_column(df) 

547 return df 

548 

549 def iter_data_frame_chunks(self, chunk_size: int, columns: List[str] = None, row_filter_query: str = None): 

550 """ 

551 Get a generator of dataframe chunks 

552 :param row_filter_query: 

553 :param chunk_size: 

554 :param columns: 

555 :return: 

556 """ 

557 records = [] 

558 for record in self._iter_records(columns, row_filter_query): 

559 records.append(record) 

560 if len(records) >= chunk_size: 

561 df = pd.DataFrame(records, columns=columns) 

562 for propertyLoader in self.property_loaders: 

563 propertyLoader.load_property_value_to_data_frame_column(df) 

564 yield df 

565 records = [] 

566 

567 def iter_records(self, columns: List[str] = None, row_filter_query: str = None): 

568 """ 

569 

570 Get a generator of table entities 

571 :param row_filter_query: 

572 :param columns: 

573 :return: 

574 """ 

575 for entity in self._iter_records(columns, row_filter_query): 

576 for propertyLoader in self.property_loaders: 

577 propertyLoader.load_property_value(entity) 

578 yield entity 

579 

580 def _iter_records(self, columns: Optional[List[str]], row_filter_query: Optional[str]): 

581 column_names_as_comma_separated_string = None 

582 if columns is not None: 

583 column_names_as_comma_separated_string = ",".join(columns) 

584 return self.table_service.query_entities(self.table_name, select=column_names_as_comma_separated_string, 

585 filter=row_filter_query) 

586 

587 def insert_data_frame_to_table(self, df: pd.DataFrame, partition_key_generator: Callable[[str], str] = None, num_records: int = None): 

588 """ 

589 Inserts or replace entities of the table corresponding to rows of the DataFrame, where the index of the dataFrame acts as rowKey. 

590 Values of object type columns in the dataFrame may have to be serialised via json beforehand. 

591 :param df: DataFrame to be inserted 

592 :param partition_key_generator: if None, partitionKeys default to tableName 

593 :param num_records: restrict insertion to first numRecords rows, merely for testing 

594 """ 

595 for (count, (idx, row)) in enumerate(df.iterrows()): 

596 if num_records is not None: 

597 if count >= num_records: 

598 break 

599 entity = row.to_dict() 

600 entity["RowKey"] = idx 

601 entity["PartitionKey"] = self.table_name if partition_key_generator is None else partition_key_generator(idx) 

602 self.insert_or_replace_entity(entity) 

603 

604 @staticmethod 

605 def _insert_or_replace_entity_via_batch(entity, batch: TableBatch): 

606 return batch.insert_or_replace_entity(entity) 

607 

608 @staticmethod 

609 def _insert_entity_via_batch(entity, batch: TableBatch): 

610 return batch.insert_entity(entity) 

611 

612 def exists(self): 

613 return self.table_service.exists(self.table_name) 

614 

615 

616class AzureTablePersistentKeyValueCache(PersistentKeyValueCache): 

617 """ 

618 PersistentKeyValueCache using Azure Table Storage, see https://docs.microsoft.com/en-gb/azure/storage/tables/ 

619 """ 

620 CACHE_VALUE_IDENTIFIER = "cache_value" 

621 

622 def __init__(self, table_service: TableService, table_name="cache", partition_key_generator: Callable[[str], str] = None, 

623 max_batch_size=100, min_size_for_periodic_commit: Optional[int] = 100, deferred_commit_delay_secs=1.0, in_memory=False, 

624 blob_backend: AzureTableBlobBackend = None, serialiser: Serialiser = None, max_workers: int = None): 

625 """ 

626 :param table_service: https://docs.microsoft.com/en-us/python/api/azure-cosmosdb-table/azure.cosmosdb.table.tableservice.tableservice?view=azure-python 

627 :param table_name: name of table, needs to match restrictions for Azure storage resources, see https://docs.microsoft.com/en-gb/azure/azure-resource-manager/management/resource-name-rules 

628 :param partition_key_generator: callable to generate a partitionKey from provided string, if None partitionKey in requests defaults 

629 to tableName 

630 :param max_batch_size: maximal batch size for each commit. 

631 :param deferred_commit_delay_secs: the time frame during which no new data must be added for a pending transaction to be committed 

632 :param min_size_for_periodic_commit: minimal size of a batch to be committed in a periodic thread. 

633 If None, commits are only executed in a deferred manner, i.e. commit only if there is no update for `deferred_commit_delay_secs` 

634 :param in_memory: boolean flag, to indicate, if table should be loaded in memory at construction 

635 :param blob_backend: if not None, blob storage will be used to store actual value and cache_value in table only contains a reference 

636 :param max_workers: maximal number of workers to load data from blob backend 

637 """ 

638 

639 self._deferredCommitDelaySecs = deferred_commit_delay_secs 

640 self._partitionKeyGenerator = partition_key_generator 

641 

642 def create_property_loaders(): 

643 if blob_backend is None and serialiser is None: 

644 _property_loaders = () 

645 elif blob_backend is None and serialiser is not None: 

646 _property_loaders = (SerialisedPropertyLoader(self.CACHE_VALUE_IDENTIFIER, serialiser),) 

647 elif blob_backend is not None and serialiser is None: 

648 property_blob_status_name = self.CACHE_VALUE_IDENTIFIER + "_blob_backed" 

649 _property_loaders = (BlobBackedPropertyLoader(self.CACHE_VALUE_IDENTIFIER, blob_backend, table_name, 

650 property_blob_status_name, max_workers),) 

651 else: 

652 property_blob_status_name = self.CACHE_VALUE_IDENTIFIER + "_blob_backed" 

653 _property_loaders = (BlobBackedSerialisedPropertyLoader(self.CACHE_VALUE_IDENTIFIER, serialiser, blob_backend, 

654 table_name, property_blob_status_name, max_workers),) 

655 return _property_loaders 

656 

657 property_loaders = create_property_loaders() 

658 self._batch_commit_table = AzureLazyBatchCommitTable(table_name, table_service, property_loaders=property_loaders) 

659 self._minSizeForPeriodicCommit = min_size_for_periodic_commit 

660 self._maxBatchSize = max_batch_size 

661 self._updateHook = PeriodicUpdateHook(deferred_commit_delay_secs, no_update_fn=self._commit, periodic_fn=self._periodically_commit) 

662 

663 self._in_memory_cache = None 

664 

665 if in_memory: 

666 df = self._batch_commit_table.load_table_to_data_frame(columns=['RowKey', self.CACHE_VALUE_IDENTIFIER]).set_index("RowKey") 

667 _log.info(f"Loaded {len(df)} entries of table {table_name} in memory") 

668 self._in_memory_cache = df[self.CACHE_VALUE_IDENTIFIER].to_dict() 

669 

670 def set(self, key, value): 

671 key_as_string = str(key) 

672 partition_key = self._get_partition_key_for_row_key(key_as_string) 

673 entity = {'PartitionKey': partition_key, 'RowKey': key_as_string, self.CACHE_VALUE_IDENTIFIER: value} 

674 self._batch_commit_table.insert_or_replace_entity(entity) 

675 self._updateHook.handle_update() 

676 

677 if self._in_memory_cache is not None: 

678 self._in_memory_cache[key_as_string] = value 

679 

680 def get(self, key): 

681 key_as_string = str(key) 

682 value = self._get_from_in_memory_cache(key_as_string) 

683 if value is None: 

684 value = self._get_from_table(key_as_string) 

685 return value 

686 

687 def _get_from_table(self, key: str): 

688 partition_key = self._get_partition_key_for_row_key(key) 

689 entity = self._batch_commit_table.get_entity(partition_key, key) 

690 if entity is not None: 

691 return entity[self.CACHE_VALUE_IDENTIFIER] 

692 return None 

693 

694 def _get_from_in_memory_cache(self, key): 

695 if self._in_memory_cache is None: 

696 return None 

697 return self._in_memory_cache.get(str(key), None) 

698 

699 def _get_partition_key_for_row_key(self, key: str): 

700 return self._batch_commit_table.table_name if self._partitionKeyGenerator is None else self._partitionKeyGenerator(key) 

701 

702 def _commit(self): 

703 self._batch_commit_table.commit_non_blocking_current_queue_state(self._maxBatchSize) 

704 

705 def _periodically_commit(self): 

706 self._batch_commit_table.commit_blocking_largest_partition_from_queue(self._maxBatchSize, self._minSizeForPeriodicCommit)