Coverage for src/sensai/util/cache_azure.py: 0%
407 statements
« prev ^ index » next coverage.py v7.6.1, created at 2024-11-29 18:29 +0000
« prev ^ index » next coverage.py v7.6.1, created at 2024-11-29 18:29 +0000
1from __future__ import annotations
2import collections
3import functools
4import pickle
5import sys
6from abc import ABC, abstractmethod
7from concurrent.futures.thread import ThreadPoolExecutor
8from typing import Callable, Dict, Union, Any, List, Sequence, Optional
9import json
10import logging
11import re
12import threading
15from azure.storage.table import TableService, TableBatch, Entity
16from azure.storage.blob import BlockBlobService
17import pandas as pd
18import numpy as np
20from .cache import PersistentKeyValueCache, PeriodicUpdateHook
22_log = logging.getLogger(__name__)
25class Serialiser(ABC):
26 """
27 Abstraction for mechanisms to serialise values, which do not fit table storage data model,
28 see https://docs.microsoft.com/en-us/rest/api/storageservices/understanding-the-table-service-data-model
29 """
31 @abstractmethod
32 def serialise(self, value) -> str:
33 pass
35 @abstractmethod
36 def deserialise(self, value: str):
37 pass
40class NumpyArrayJsonSerialiser(Serialiser):
41 """
42 Serialises a numpy array as json string of list representation of array
43 """
45 def serialise(self, value: np.ndarray) -> str:
46 return json.dumps(value.tolist())
48 def deserialise(self, value: str):
49 return np.array(json.loads(value))
52class PropertyLoader(ABC):
53 """
54 Abstraction of a customised loader for an entity property
55 """
57 @abstractmethod
58 def load_property_value(self, entity: Entity):
59 pass
61 @abstractmethod
62 def write_property_value(self, entity: Entity):
63 pass
65 @abstractmethod
66 def load_property_value_to_data_frame_column(self, df: pd.DataFrame):
67 pass
70class SerialisedPropertyLoader(PropertyLoader):
71 """
72 PropertyLoader to serialise and de-serialise values. Useful, if type of values is not aligned with table storage data model,
73 see https://docs.microsoft.com/en-us/rest/api/storageservices/understanding-the-table-service-data-model
74 """
75 def __init__(self, property_name: str, serialiser: Serialiser):
76 self.serialiser = serialiser
77 self.property_name = property_name
79 def load_property_value(self, entity: Entity):
80 entity[self.property_name] = self.serialiser.deserialise(entity[self.property_name])
82 def write_property_value(self, entity: Entity):
83 entity[self.property_name] = self.serialiser.serialise(entity[self.property_name])
85 def load_property_value_to_data_frame_column(self, df: pd.DataFrame):
86 if self.property_name in df.columns:
87 df.loc[:, self.property_name] = [self.serialiser.deserialise(value) for value in df[self.property_name]]
90class AzureTableBlobBackend(ABC):
91 """
92 Abstraction of a blob backend, which allows for convenient setting and getting of values stored in blob storage via a
93 reference to the value
94 """
96 @abstractmethod
97 def get_value_from_reference(self, value_identifier: str):
98 pass
100 @abstractmethod
101 def get_value_reference(self, partition_key: str, row_key: str, value_name: str, blob_name_prefix: str = None) -> str:
102 pass
104 @abstractmethod
105 def set_value_for_reference(self, value_identifier: str, value):
106 pass
109class BlobPerKeyAzureTableBlobBackend(AzureTableBlobBackend, ABC):
111 """
112 Backend stores serialised values as /tableName/partitionKey/rowKey/valueName.<fileExtension>
113 or /tableName/rowKey/valueName.<fileExtension>, if partitionKey equals tableName
114 """
116 def __init__(self, block_blob_service: BlockBlobService, container_name: str):
117 """
119 :param block_blob_service: https://docs.microsoft.com/en-us/python/api/azure-storage-blob/azure.storage.blob.blockblobservice.blockblobservice?view=azure-python-previous
120 """
121 self.block_blob_service = block_blob_service
122 self.container_name = container_name
123 self.container_list = [container.name for container in block_blob_service.list_containers()]
124 if container_name not in self.container_list:
125 self.block_blob_service.create_container(container_name)
126 self.container_list.append(container_name)
128 @property
129 @abstractmethod
130 def file_extension(self):
131 pass
133 @abstractmethod
134 def _get_blob_value(self, container_name, blob_name):
135 pass
137 @abstractmethod
138 def _write_value_to_blob(self, container_name, blob_name, value):
139 pass
141 def get_value_from_reference(self, value_identifier: str):
142 container_name = self._get_container_name_from_identifier(value_identifier)
143 blob_name = self._get_blob_name_from_identifier(value_identifier)
144 return self._get_blob_value(container_name, blob_name)
146 def get_value_reference(self, partition_key: str, row_key: str, value_name: str, blob_name_prefix: str = None) -> str:
147 blob_name = self._get_blob_name_from_keys(partition_key, row_key, value_name, blob_prefix=blob_name_prefix)
148 return self.block_blob_service.make_blob_url(self.container_name, blob_name)
150 def set_value_for_reference(self, value_identifier: str, value):
151 container_name = self._get_container_name_from_identifier(value_identifier)
152 blob_name = self._get_blob_name_from_identifier(value_identifier)
153 self._write_value_to_blob(container_name, blob_name, value)
155 def _get_blob_name_from_identifier(self, value_identifier: str):
156 return (value_identifier.partition(f"{self.block_blob_service.primary_endpoint}/")[2]).partition("/")[2]
158 def _get_container_name_from_identifier(self, value_identifier: str):
159 return (value_identifier.partition(f"{self.block_blob_service.primary_endpoint}/")[2]).partition("/")[0]
161 def _get_blob_name_from_keys(self, partition_key: str, row_key: str, value_name: str, blob_prefix: str = None):
162 identifier_list = [blob_prefix, partition_key] if blob_prefix is not None and blob_prefix != partition_key else [partition_key]
163 identifier_list.extend([row_key, value_name])
164 return "/".join(identifier_list) + self.file_extension
167class TextDumpAzureTableBlobBackend(BlobPerKeyAzureTableBlobBackend):
168 """
169 Backend stores values as txt files in the structure /tableName/partitionKey/rowKey/valueName
170 """
172 @property
173 def file_extension(self):
174 return ""
176 def _get_blob_value(self, container_name, blob_name):
177 return self.block_blob_service.get_blob_to_text(container_name, blob_name).content
179 def _write_value_to_blob(self, container_name, blob_name, value):
180 self.block_blob_service.create_blob_from_text(container_name, blob_name, value)
183class JsonAzureTableBlobBackend(BlobPerKeyAzureTableBlobBackend):
184 """
185 Backend stores values as json files in the structure /tableName/partitionKey/rowKey/valueName.json
186 """
188 @property
189 def file_extension(self):
190 return ".json"
192 def _get_blob_value(self, container_name, blob_name):
193 encoded_value = self.block_blob_service.get_blob_to_bytes(container_name, blob_name).content
194 return self._decode_bytes_to_value(encoded_value)
196 def _write_value_to_blob(self, container_name, blob_name, value):
197 encoded_value = self._encode_value_to_bytes(value)
198 self.block_blob_service.create_blob_from_bytes(container_name, blob_name, encoded_value)
200 @staticmethod
201 def _encode_value_to_bytes(value):
202 return str.encode(json.dumps(value))
204 @staticmethod
205 def _decode_bytes_to_value(_bytes):
206 return json.loads(_bytes.decode())
209class PickleAzureTableBlobBackend(JsonAzureTableBlobBackend):
210 """
211 Backend stores values as pickle files in the structure /tableName/partitionKey/rowKey/valueName.pickle
212 """
214 @property
215 def file_extension(self):
216 return ".pickle"
218 @staticmethod
219 def _encode_value_to_bytes(value):
220 return pickle.dumps(value)
222 @staticmethod
223 def _decode_bytes_to_value(_bytes):
224 return pickle.loads(_bytes)
227class BlobBackedPropertyLoader(PropertyLoader):
228 AZURE_ALLOWED_SIZE_PER_PROPERTY_BYTES = 64000
229 AZURE_ALLOWED_STRING_LENGTH_PER_PROPERTY = 32000
231 """
232 PropertyLoader to write and read values from blob backend via a reference to the value. Useful, if values cannot
233 be stored in table storage itself, due to not being aligned with table storage data model,
234 see https://docs.microsoft.com/en-us/rest/api/storageservices/understanding-the-table-service-data-model
235 """
236 def __init__(self, property_name: str, blob_backend: AzureTableBlobBackend, blob_prefix: str = None,
237 property_boolean_blob_status_name: str = None, max_workers=None):
238 """
239 :param property_name: name of property in table
240 :param property_boolean_blob_status_name: name of property representing a boolean flag within a table, which indicates, if value is
241 blob backed. If None, each value is assumed to be blob backed.
242 :param blob_backend: actual backend to use for storage
243 :param blob_prefix: prefix to use for blob in storage, e.g. a table name
244 :param max_workers: maximal number of workers to load data from blob storage
245 """
246 self.blob_prefix = blob_prefix
247 self.property_blob_status_name = property_boolean_blob_status_name
248 self.blob_backend = blob_backend
249 self.max_workers = max_workers
250 self.propertyName = property_name
252 def load_property_value(self, entity: Entity):
253 if self._is_entity_value_blob_backed(entity):
254 entity[self.propertyName] = self.blob_backend.get_value_from_reference(entity[self.propertyName])
256 def write_property_value(self, entity: Entity):
257 if self.propertyName in entity.keys():
258 if self._need_to_write_to_blob(entity[self.propertyName]):
259 value_identifier = self.blob_backend.get_value_reference(entity["PartitionKey"], entity["RowKey"], self.propertyName,
260 blob_name_prefix=self.blob_prefix)
261 value = entity[self.propertyName]
262 self.blob_backend.set_value_for_reference(value_identifier, value)
263 entity[self.propertyName] = value_identifier
264 property_blob_status = True if self.property_blob_status_name is not None else None
265 else:
266 property_blob_status = False if self.property_blob_status_name is not None else None
268 if property_blob_status is not None:
269 entity[self.property_blob_status_name] = property_blob_status
271 def load_property_value_to_data_frame_column(self, df: pd.DataFrame):
272 if self.propertyName in df.columns:
273 if self.property_blob_status_name is None:
274 df.loc[:, self.propertyName] = self._load_values_in_series(df[self.propertyName])
275 else:
276 df.loc[df[self.property_blob_status_name], self.propertyName] = \
277 self._load_values_in_series(df.loc[df[self.property_blob_status_name], self.propertyName])
279 def _need_to_write_to_blob(self, value):
280 if self.property_blob_status_name is None:
281 return True
282 if sys.getsizeof(value) > self.AZURE_ALLOWED_SIZE_PER_PROPERTY_BYTES:
283 return True
284 if isinstance(value, str) and len(value) > self.AZURE_ALLOWED_STRING_LENGTH_PER_PROPERTY:
285 return True
286 return False
288 def _is_entity_value_blob_backed(self, entity: Entity):
289 if self.propertyName not in entity.keys():
290 return False
291 if self.property_blob_status_name is None or self.property_blob_status_name not in entity:
292 return True
293 return entity[self.property_blob_status_name]
295 def _load_values_in_series(self, _series: pd.Series):
296 with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
297 _series = list(executor.map(self.blob_backend.get_value_from_reference, _series))
298 return _series
301class BlobBackedSerialisedPropertyLoader(BlobBackedPropertyLoader, SerialisedPropertyLoader):
302 """
303 Property loader, which combines serialisation and blob backing.
304 """
305 def __init__(self, property_name, serialiser: Serialiser, blob_backend: AzureTableBlobBackend, blob_prefix: str = None,
306 property_boolean_blob_status_name: str = None, max_workers=None):
307 """
310 :param property_name: name of property in table
311 :param serialiser:
312 :param property_boolean_blob_status_name: name of property representing a boolean flag within a table, which indicates, if value is
313 blob backed. If None, each value is assumed to be blob backed.
314 :param blob_backend: actual backend to use for storage
315 :param blob_prefix: prefix to use for blob in storage, e.g. a table name
316 :param max_workers: maximal number of workers to load data from blob storage
317 """
318 SerialisedPropertyLoader.__init__(self, property_name, serialiser)
319 BlobBackedPropertyLoader.__init__(self, property_name, blob_backend, blob_prefix, property_boolean_blob_status_name, max_workers)
321 def load_property_value(self, entity: Entity):
322 super(BlobBackedPropertyLoader, self).load_property_value(entity)
323 super(SerialisedPropertyLoader, self).load_property_value(entity)
325 def write_property_value(self, entity: Entity):
326 super(SerialisedPropertyLoader, self).write_property_value(entity)
327 super(BlobBackedPropertyLoader, self).write_property_value(entity)
329 def load_property_value_to_data_frame_column(self, df: pd.DataFrame):
330 super(BlobBackedPropertyLoader, self).load_property_value_to_data_frame_column(df)
331 super(SerialisedPropertyLoader, self).load_property_value_to_data_frame_column(df)
334class AzureLazyBatchCommitTable:
335 """
336 Wrapper for an Azure table, which allow for convenient insertion via lazy batch execution per partition.
337 Uses a priority queue to manage order of partitions to be committed.
338 To execute insertions, call :func:`LazyBatchCommitTable.commit`
339 """
341 AZURE_ALLOWED_TABLE_NAME_PATTERN = re.compile("^[A-Za-z][A-Za-z0-9]{2,62}$")
342 AZURE_ALLOWED_TABLE_BATCH_SIZE = 100
344 class PartitionCommandsPriorityQueue:
346 class PartitionCommands:
347 def __init__(self, partition_key):
348 self.partition_key = partition_key
349 self._command_list = collections.deque()
351 def __len__(self):
352 return len(self._command_list)
354 def append(self, command):
355 self._command_list.append(command)
357 def execute(self, context_manager: Callable[[], TableBatch], batch_size: int):
358 while len(self._command_list) > 0:
359 _slice = [self._command_list.popleft() for _ in range(min(batch_size, len(self._command_list)))]
360 _log.info(f"Committing {len(_slice)} cache entries to the database")
361 with context_manager() as batch:
362 for command in _slice:
363 command(batch)
365 def __init__(self):
366 self.partition_commands_queue = []
367 self.partition_key2_commands = {}
368 self._thread_lock = threading.Lock()
370 def add_command(self, partition_key, command: Union[Callable[[TableBatch], Any], functools.partial[TableBatch]]):
371 """
372 Add a command to queue of corresponding partitionKey
373 :param partition_key:
374 :param command: a callable on a TableBatch
375 """
376 with self._thread_lock:
377 if partition_key not in self.partition_key2_commands:
378 commands = self.PartitionCommands(partition_key)
379 self.partition_commands_queue.append(commands)
380 self.partition_key2_commands[partition_key] = commands
381 self.partition_key2_commands[partition_key].append(command)
383 def pop(self, min_length: int = None) -> Optional[AzureLazyBatchCommitTable.PartitionCommandsPriorityQueue.PartitionCommands]:
384 """
385 :param min_length: minimal length of largest PartitionCommands for the pop to take place.
386 :return: largest PartitionCommands or None if minimal length is not reached
387 """
388 with self._thread_lock:
389 return self._pop(min_length)
391 def pop_all(self):
392 with self._thread_lock:
393 commands_list = []
394 while not self._is_empty():
395 commands_list.append(self._pop())
396 return commands_list
398 def is_empty(self):
399 with self._thread_lock:
400 return self._is_empty()
402 def _pop(self, min_length=None):
403 length, index = self._get_max_priority_info()
404 if index is not None and (min_length is None or length >= min_length):
405 q = self.partition_commands_queue.pop(index)
406 del self.partition_key2_commands[q.partition_key]
407 return q
408 else:
409 return None
411 def _is_empty(self):
412 return len(self.partition_commands_queue) == 0
414 def _get_max_priority_info(self):
415 lengths_list = list(map(len, self.partition_commands_queue))
416 if len(lengths_list) == 0:
417 return 0, None
418 max_length = max(lengths_list)
419 return max_length, lengths_list.index(max_length)
421 def __init__(self, table_name: str, table_service: TableService, property_loaders: Sequence[PropertyLoader] = ()):
422 """
423 :param table_name: name of table
424 :param table_service: instance of :class:`azure.storage.table.TableService` to connect to Azure table storage
425 :param property_loaders:
426 """
428 if not self.AZURE_ALLOWED_TABLE_NAME_PATTERN.match(table_name):
429 raise ValueError(f"Invalid table name {table_name}, see: "
430 f"https://docs.microsoft.com/en-us/rest/api/storageservices/Understanding-the-Table-Service-Data-Model")
432 self.table_service = table_service
433 self.table_name = table_name
434 self.property_loaders = property_loaders
435 self._partition_queues = self.PartitionCommandsPriorityQueue()
436 self._context_manager = functools.partial(self.table_service.batch, self.table_name)
438 if not self.exists():
439 self.table_service.create_table(self.table_name)
441 def insert_or_replace_entity(self, entity: Union[Dict, Entity]):
442 """
443 Lazy wrapper method for :func:`azure.storage.table.TableService.insert_or_replace_entity`
444 :param entity:
445 """
446 partition_key = entity["PartitionKey"]
447 for property_loader in self.property_loaders:
448 property_loader.write_property_value(entity)
449 execution_command = functools.partial(self._insert_or_replace_entity_via_batch, entity)
450 self._partition_queues.add_command(partition_key, execution_command)
452 def insert_entity(self, entity: Union[Dict, Entity]):
453 """
454 Lazy wrapper method for :func:`azure.storage.table.TableService.insert_entity`
455 :param entity:
456 """
457 partition_key = entity["PartitionKey"]
458 for property_loader in self.property_loaders:
459 property_loader.write_property_value(entity)
460 execution_command = functools.partial(self._insert_entity_via_batch, entity)
461 self._partition_queues.add_command(partition_key, execution_command)
463 def get_entity(self, partition_key: str, row_key: str) -> Optional[Entity]:
464 """
465 Wraps :func:`azure.storage.table.TableService.get_entity`
466 :param partition_key:
467 :param row_key:
468 :return:
469 """
470 try:
471 entity = self.table_service.get_entity(self.table_name, partition_key, row_key)
472 for property_loader in self.property_loaders:
473 property_loader.load_property_value(entity)
474 return entity
475 except Exception as e:
476 _log.debug(f"Unable to load value for partitionKey {partition_key} and rowKey {row_key} from table {self.table_name}: {e}")
477 return None
479 def commit_blocking_until_empty(self, max_batch_size=AZURE_ALLOWED_TABLE_BATCH_SIZE):
480 """
481 Commit insertion commands. Commands are executed batch-wise per partition until partition queue is empty in a
482 blocking manner.
483 :param max_batch_size: maximal batch size to use for batch insertion, must be less or equal to batch size allowed by Azure
484 """
486 max_batch_size = self._validate_max_batch_size(max_batch_size)
488 while not self._partition_queues.is_empty():
489 commands = self._partition_queues.pop()
490 commands.execute(self._context_manager, max_batch_size)
492 def commit_non_blocking_current_queue_state(self, max_batch_size=AZURE_ALLOWED_TABLE_BATCH_SIZE):
493 """
494 Commit insertion commands. Empties the current PartitionCommandsQueue in a non blocking way.
495 Commands are executed batch-wise per partition.
496 :param max_batch_size: maximal batch size to use for batch insertion, must be less or equal to batch size allowed by Azure
497 """
499 max_batch_size = self._validate_max_batch_size(max_batch_size)
501 def commit():
502 commands_list = self._partition_queues.pop_all()
503 for commands in commands_list:
504 commands.execute(self._context_manager, max_batch_size)
506 thread = threading.Thread(target=commit, daemon=False)
507 thread.start()
509 def commit_blocking_largest_partition_from_queue(self, max_batch_size=AZURE_ALLOWED_TABLE_BATCH_SIZE, min_length=None):
510 """
511 Commits in a blocking way the largest partition from PartitionCommandsQueue
512 :param max_batch_size: maximal batch size to use for batch insertion, must be less or equal to batch size allowed by Azure
513 :param min_length: minimal size of largest partition. If not None, pop and commit only if minLength is reached.
514 :return:
515 """
516 max_batch_size = self._validate_max_batch_size(max_batch_size)
517 commands = self._partition_queues.pop(min_length)
518 if commands is not None:
519 commands.execute(self._context_manager, max_batch_size)
521 def _validate_max_batch_size(self, max_batch_size):
522 if max_batch_size > self.AZURE_ALLOWED_TABLE_BATCH_SIZE:
523 _log.warning(f"Provided maxBatchSize is larger than allowed size {self.AZURE_ALLOWED_TABLE_BATCH_SIZE}. "
524 f"Will use maxBatchSize {self.AZURE_ALLOWED_TABLE_BATCH_SIZE} instead.")
525 max_batch_size = self.AZURE_ALLOWED_TABLE_BATCH_SIZE
526 return max_batch_size
528 def load_table_to_data_frame(self, columns: List[str] = None, row_filter_query: str = None, num_records: int = None):
529 """
530 Load all rows of table to :class:`~pandas.DataFrame`
531 :param row_filter_query:
532 :param num_records:
533 :param columns: restrict loading to provided columns
534 :return: :class:`~pandas.DataFrame`
535 """
536 if num_records is None:
537 records = list(self._iter_records(columns, row_filter_query))
538 else:
539 records = []
540 for record in self._iter_records(columns, row_filter_query):
541 records.append(record)
542 if len(records) >= num_records:
543 break
544 df = pd.DataFrame(records, columns=columns)
545 for property_loader in self.property_loaders:
546 property_loader.load_property_value_to_data_frame_column(df)
547 return df
549 def iter_data_frame_chunks(self, chunk_size: int, columns: List[str] = None, row_filter_query: str = None):
550 """
551 Get a generator of dataframe chunks
552 :param row_filter_query:
553 :param chunk_size:
554 :param columns:
555 :return:
556 """
557 records = []
558 for record in self._iter_records(columns, row_filter_query):
559 records.append(record)
560 if len(records) >= chunk_size:
561 df = pd.DataFrame(records, columns=columns)
562 for propertyLoader in self.property_loaders:
563 propertyLoader.load_property_value_to_data_frame_column(df)
564 yield df
565 records = []
567 def iter_records(self, columns: List[str] = None, row_filter_query: str = None):
568 """
570 Get a generator of table entities
571 :param row_filter_query:
572 :param columns:
573 :return:
574 """
575 for entity in self._iter_records(columns, row_filter_query):
576 for propertyLoader in self.property_loaders:
577 propertyLoader.load_property_value(entity)
578 yield entity
580 def _iter_records(self, columns: Optional[List[str]], row_filter_query: Optional[str]):
581 column_names_as_comma_separated_string = None
582 if columns is not None:
583 column_names_as_comma_separated_string = ",".join(columns)
584 return self.table_service.query_entities(self.table_name, select=column_names_as_comma_separated_string,
585 filter=row_filter_query)
587 def insert_data_frame_to_table(self, df: pd.DataFrame, partition_key_generator: Callable[[str], str] = None, num_records: int = None):
588 """
589 Inserts or replace entities of the table corresponding to rows of the DataFrame, where the index of the dataFrame acts as rowKey.
590 Values of object type columns in the dataFrame may have to be serialised via json beforehand.
591 :param df: DataFrame to be inserted
592 :param partition_key_generator: if None, partitionKeys default to tableName
593 :param num_records: restrict insertion to first numRecords rows, merely for testing
594 """
595 for (count, (idx, row)) in enumerate(df.iterrows()):
596 if num_records is not None:
597 if count >= num_records:
598 break
599 entity = row.to_dict()
600 entity["RowKey"] = idx
601 entity["PartitionKey"] = self.table_name if partition_key_generator is None else partition_key_generator(idx)
602 self.insert_or_replace_entity(entity)
604 @staticmethod
605 def _insert_or_replace_entity_via_batch(entity, batch: TableBatch):
606 return batch.insert_or_replace_entity(entity)
608 @staticmethod
609 def _insert_entity_via_batch(entity, batch: TableBatch):
610 return batch.insert_entity(entity)
612 def exists(self):
613 return self.table_service.exists(self.table_name)
616class AzureTablePersistentKeyValueCache(PersistentKeyValueCache):
617 """
618 PersistentKeyValueCache using Azure Table Storage, see https://docs.microsoft.com/en-gb/azure/storage/tables/
619 """
620 CACHE_VALUE_IDENTIFIER = "cache_value"
622 def __init__(self, table_service: TableService, table_name="cache", partition_key_generator: Callable[[str], str] = None,
623 max_batch_size=100, min_size_for_periodic_commit: Optional[int] = 100, deferred_commit_delay_secs=1.0, in_memory=False,
624 blob_backend: AzureTableBlobBackend = None, serialiser: Serialiser = None, max_workers: int = None):
625 """
626 :param table_service: https://docs.microsoft.com/en-us/python/api/azure-cosmosdb-table/azure.cosmosdb.table.tableservice.tableservice?view=azure-python
627 :param table_name: name of table, needs to match restrictions for Azure storage resources, see https://docs.microsoft.com/en-gb/azure/azure-resource-manager/management/resource-name-rules
628 :param partition_key_generator: callable to generate a partitionKey from provided string, if None partitionKey in requests defaults
629 to tableName
630 :param max_batch_size: maximal batch size for each commit.
631 :param deferred_commit_delay_secs: the time frame during which no new data must be added for a pending transaction to be committed
632 :param min_size_for_periodic_commit: minimal size of a batch to be committed in a periodic thread.
633 If None, commits are only executed in a deferred manner, i.e. commit only if there is no update for `deferred_commit_delay_secs`
634 :param in_memory: boolean flag, to indicate, if table should be loaded in memory at construction
635 :param blob_backend: if not None, blob storage will be used to store actual value and cache_value in table only contains a reference
636 :param max_workers: maximal number of workers to load data from blob backend
637 """
639 self._deferredCommitDelaySecs = deferred_commit_delay_secs
640 self._partitionKeyGenerator = partition_key_generator
642 def create_property_loaders():
643 if blob_backend is None and serialiser is None:
644 _property_loaders = ()
645 elif blob_backend is None and serialiser is not None:
646 _property_loaders = (SerialisedPropertyLoader(self.CACHE_VALUE_IDENTIFIER, serialiser),)
647 elif blob_backend is not None and serialiser is None:
648 property_blob_status_name = self.CACHE_VALUE_IDENTIFIER + "_blob_backed"
649 _property_loaders = (BlobBackedPropertyLoader(self.CACHE_VALUE_IDENTIFIER, blob_backend, table_name,
650 property_blob_status_name, max_workers),)
651 else:
652 property_blob_status_name = self.CACHE_VALUE_IDENTIFIER + "_blob_backed"
653 _property_loaders = (BlobBackedSerialisedPropertyLoader(self.CACHE_VALUE_IDENTIFIER, serialiser, blob_backend,
654 table_name, property_blob_status_name, max_workers),)
655 return _property_loaders
657 property_loaders = create_property_loaders()
658 self._batch_commit_table = AzureLazyBatchCommitTable(table_name, table_service, property_loaders=property_loaders)
659 self._minSizeForPeriodicCommit = min_size_for_periodic_commit
660 self._maxBatchSize = max_batch_size
661 self._updateHook = PeriodicUpdateHook(deferred_commit_delay_secs, no_update_fn=self._commit, periodic_fn=self._periodically_commit)
663 self._in_memory_cache = None
665 if in_memory:
666 df = self._batch_commit_table.load_table_to_data_frame(columns=['RowKey', self.CACHE_VALUE_IDENTIFIER]).set_index("RowKey")
667 _log.info(f"Loaded {len(df)} entries of table {table_name} in memory")
668 self._in_memory_cache = df[self.CACHE_VALUE_IDENTIFIER].to_dict()
670 def set(self, key, value):
671 key_as_string = str(key)
672 partition_key = self._get_partition_key_for_row_key(key_as_string)
673 entity = {'PartitionKey': partition_key, 'RowKey': key_as_string, self.CACHE_VALUE_IDENTIFIER: value}
674 self._batch_commit_table.insert_or_replace_entity(entity)
675 self._updateHook.handle_update()
677 if self._in_memory_cache is not None:
678 self._in_memory_cache[key_as_string] = value
680 def get(self, key):
681 key_as_string = str(key)
682 value = self._get_from_in_memory_cache(key_as_string)
683 if value is None:
684 value = self._get_from_table(key_as_string)
685 return value
687 def _get_from_table(self, key: str):
688 partition_key = self._get_partition_key_for_row_key(key)
689 entity = self._batch_commit_table.get_entity(partition_key, key)
690 if entity is not None:
691 return entity[self.CACHE_VALUE_IDENTIFIER]
692 return None
694 def _get_from_in_memory_cache(self, key):
695 if self._in_memory_cache is None:
696 return None
697 return self._in_memory_cache.get(str(key), None)
699 def _get_partition_key_for_row_key(self, key: str):
700 return self._batch_commit_table.table_name if self._partitionKeyGenerator is None else self._partitionKeyGenerator(key)
702 def _commit(self):
703 self._batch_commit_table.commit_non_blocking_current_queue_state(self._maxBatchSize)
705 def _periodically_commit(self):
706 self._batch_commit_table.commit_blocking_largest_partition_from_queue(self._maxBatchSize, self._minSizeForPeriodicCommit)