from __future__ import annotations
import collections
import functools
import pickle
import sys
from abc import ABC, abstractmethod
from concurrent.futures.thread import ThreadPoolExecutor
from typing import Callable, Dict, Union, Any, List, Sequence, Optional
import json
import logging
import re
import threading
from azure.storage.table import TableService, TableBatch, Entity
from azure.storage.blob import BlockBlobService
import pandas as pd
import numpy as np
from .cache import PersistentKeyValueCache, PeriodicUpdateHook
_log = logging.getLogger(__name__)
[docs]class Serialiser(ABC):
"""
Abstraction for mechanisms to serialise values, which do not fit table storage data model,
see https://docs.microsoft.com/en-us/rest/api/storageservices/understanding-the-table-service-data-model
"""
[docs] @abstractmethod
def serialise(self, value) -> str:
pass
[docs] @abstractmethod
def deserialise(self, value: str):
pass
[docs]class NumpyArrayJsonSerialiser(Serialiser):
"""
Serialises a numpy array as json string of list representation of array
"""
[docs] def serialise(self, value: np.ndarray) -> str:
return json.dumps(value.tolist())
[docs] def deserialise(self, value: str):
return np.array(json.loads(value))
[docs]class PropertyLoader(ABC):
"""
Abstraction of a customised loader for an entity property
"""
[docs] @abstractmethod
def load_property_value(self, entity: Entity):
pass
[docs] @abstractmethod
def write_property_value(self, entity: Entity):
pass
[docs] @abstractmethod
def load_property_value_to_data_frame_column(self, df: pd.DataFrame):
pass
[docs]class SerialisedPropertyLoader(PropertyLoader):
"""
PropertyLoader to serialise and de-serialise values. Useful, if type of values is not aligned with table storage data model,
see https://docs.microsoft.com/en-us/rest/api/storageservices/understanding-the-table-service-data-model
"""
def __init__(self, property_name: str, serialiser: Serialiser):
self.serialiser = serialiser
self.property_name = property_name
[docs] def load_property_value(self, entity: Entity):
entity[self.property_name] = self.serialiser.deserialise(entity[self.property_name])
[docs] def write_property_value(self, entity: Entity):
entity[self.property_name] = self.serialiser.serialise(entity[self.property_name])
[docs] def load_property_value_to_data_frame_column(self, df: pd.DataFrame):
if self.property_name in df.columns:
df.loc[:, self.property_name] = [self.serialiser.deserialise(value) for value in df[self.property_name]]
[docs]class AzureTableBlobBackend(ABC):
"""
Abstraction of a blob backend, which allows for convenient setting and getting of values stored in blob storage via a
reference to the value
"""
[docs] @abstractmethod
def get_value_from_reference(self, value_identifier: str):
pass
[docs] @abstractmethod
def get_value_reference(self, partition_key: str, row_key: str, value_name: str, blob_name_prefix: str = None) -> str:
pass
[docs] @abstractmethod
def set_value_for_reference(self, value_identifier: str, value):
pass
[docs]class BlobPerKeyAzureTableBlobBackend(AzureTableBlobBackend, ABC):
"""
Backend stores serialised values as /tableName/partitionKey/rowKey/valueName.<fileExtension>
or /tableName/rowKey/valueName.<fileExtension>, if partitionKey equals tableName
"""
def __init__(self, block_blob_service: BlockBlobService, container_name: str):
"""
:param block_blob_service: https://docs.microsoft.com/en-us/python/api/azure-storage-blob/azure.storage.blob.blockblobservice.blockblobservice?view=azure-python-previous
"""
self.block_blob_service = block_blob_service
self.container_name = container_name
self.container_list = [container.name for container in block_blob_service.list_containers()]
if container_name not in self.container_list:
self.block_blob_service.create_container(container_name)
self.container_list.append(container_name)
@property
@abstractmethod
def file_extension(self):
pass
@abstractmethod
def _get_blob_value(self, container_name, blob_name):
pass
@abstractmethod
def _write_value_to_blob(self, container_name, blob_name, value):
pass
[docs] def get_value_from_reference(self, value_identifier: str):
container_name = self._get_container_name_from_identifier(value_identifier)
blob_name = self._get_blob_name_from_identifier(value_identifier)
return self._get_blob_value(container_name, blob_name)
[docs] def get_value_reference(self, partition_key: str, row_key: str, value_name: str, blob_name_prefix: str = None) -> str:
blob_name = self._get_blob_name_from_keys(partition_key, row_key, value_name, blob_prefix=blob_name_prefix)
return self.block_blob_service.make_blob_url(self.container_name, blob_name)
[docs] def set_value_for_reference(self, value_identifier: str, value):
container_name = self._get_container_name_from_identifier(value_identifier)
blob_name = self._get_blob_name_from_identifier(value_identifier)
self._write_value_to_blob(container_name, blob_name, value)
def _get_blob_name_from_identifier(self, value_identifier: str):
return (value_identifier.partition(f"{self.block_blob_service.primary_endpoint}/")[2]).partition("/")[2]
def _get_container_name_from_identifier(self, value_identifier: str):
return (value_identifier.partition(f"{self.block_blob_service.primary_endpoint}/")[2]).partition("/")[0]
def _get_blob_name_from_keys(self, partition_key: str, row_key: str, value_name: str, blob_prefix: str = None):
identifier_list = [blob_prefix, partition_key] if blob_prefix is not None and blob_prefix != partition_key else [partition_key]
identifier_list.extend([row_key, value_name])
return "/".join(identifier_list) + self.file_extension
[docs]class TextDumpAzureTableBlobBackend(BlobPerKeyAzureTableBlobBackend):
"""
Backend stores values as txt files in the structure /tableName/partitionKey/rowKey/valueName
"""
@property
def file_extension(self):
return ""
def _get_blob_value(self, container_name, blob_name):
return self.block_blob_service.get_blob_to_text(container_name, blob_name).content
def _write_value_to_blob(self, container_name, blob_name, value):
self.block_blob_service.create_blob_from_text(container_name, blob_name, value)
[docs]class JsonAzureTableBlobBackend(BlobPerKeyAzureTableBlobBackend):
"""
Backend stores values as json files in the structure /tableName/partitionKey/rowKey/valueName.json
"""
@property
def file_extension(self):
return ".json"
def _get_blob_value(self, container_name, blob_name):
encoded_value = self.block_blob_service.get_blob_to_bytes(container_name, blob_name).content
return self._decode_bytes_to_value(encoded_value)
def _write_value_to_blob(self, container_name, blob_name, value):
encoded_value = self._encode_value_to_bytes(value)
self.block_blob_service.create_blob_from_bytes(container_name, blob_name, encoded_value)
@staticmethod
def _encode_value_to_bytes(value):
return str.encode(json.dumps(value))
@staticmethod
def _decode_bytes_to_value(_bytes):
return json.loads(_bytes.decode())
[docs]class PickleAzureTableBlobBackend(JsonAzureTableBlobBackend):
"""
Backend stores values as pickle files in the structure /tableName/partitionKey/rowKey/valueName.pickle
"""
@property
def file_extension(self):
return ".pickle"
@staticmethod
def _encode_value_to_bytes(value):
return pickle.dumps(value)
@staticmethod
def _decode_bytes_to_value(_bytes):
return pickle.loads(_bytes)
[docs]class BlobBackedPropertyLoader(PropertyLoader):
AZURE_ALLOWED_SIZE_PER_PROPERTY_BYTES = 64000
AZURE_ALLOWED_STRING_LENGTH_PER_PROPERTY = 32000
"""
PropertyLoader to write and read values from blob backend via a reference to the value. Useful, if values cannot
be stored in table storage itself, due to not being aligned with table storage data model,
see https://docs.microsoft.com/en-us/rest/api/storageservices/understanding-the-table-service-data-model
"""
def __init__(self, property_name: str, blob_backend: AzureTableBlobBackend, blob_prefix: str = None,
property_boolean_blob_status_name: str = None, max_workers=None):
"""
:param property_name: name of property in table
:param property_boolean_blob_status_name: name of property representing a boolean flag within a table, which indicates, if value is
blob backed. If None, each value is assumed to be blob backed.
:param blob_backend: actual backend to use for storage
:param blob_prefix: prefix to use for blob in storage, e.g. a table name
:param max_workers: maximal number of workers to load data from blob storage
"""
self.blob_prefix = blob_prefix
self.property_blob_status_name = property_boolean_blob_status_name
self.blob_backend = blob_backend
self.max_workers = max_workers
self.propertyName = property_name
[docs] def load_property_value(self, entity: Entity):
if self._is_entity_value_blob_backed(entity):
entity[self.propertyName] = self.blob_backend.get_value_from_reference(entity[self.propertyName])
[docs] def write_property_value(self, entity: Entity):
if self.propertyName in entity.keys():
if self._need_to_write_to_blob(entity[self.propertyName]):
value_identifier = self.blob_backend.get_value_reference(entity["PartitionKey"], entity["RowKey"], self.propertyName,
blob_name_prefix=self.blob_prefix)
value = entity[self.propertyName]
self.blob_backend.set_value_for_reference(value_identifier, value)
entity[self.propertyName] = value_identifier
property_blob_status = True if self.property_blob_status_name is not None else None
else:
property_blob_status = False if self.property_blob_status_name is not None else None
if property_blob_status is not None:
entity[self.property_blob_status_name] = property_blob_status
[docs] def load_property_value_to_data_frame_column(self, df: pd.DataFrame):
if self.propertyName in df.columns:
if self.property_blob_status_name is None:
df.loc[:, self.propertyName] = self._load_values_in_series(df[self.propertyName])
else:
df.loc[df[self.property_blob_status_name], self.propertyName] = \
self._load_values_in_series(df.loc[df[self.property_blob_status_name], self.propertyName])
def _need_to_write_to_blob(self, value):
if self.property_blob_status_name is None:
return True
if sys.getsizeof(value) > self.AZURE_ALLOWED_SIZE_PER_PROPERTY_BYTES:
return True
if isinstance(value, str) and len(value) > self.AZURE_ALLOWED_STRING_LENGTH_PER_PROPERTY:
return True
return False
def _is_entity_value_blob_backed(self, entity: Entity):
if self.propertyName not in entity.keys():
return False
if self.property_blob_status_name is None or self.property_blob_status_name not in entity:
return True
return entity[self.property_blob_status_name]
def _load_values_in_series(self, _series: pd.Series):
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
_series = list(executor.map(self.blob_backend.get_value_from_reference, _series))
return _series
[docs]class BlobBackedSerialisedPropertyLoader(BlobBackedPropertyLoader, SerialisedPropertyLoader):
"""
Property loader, which combines serialisation and blob backing.
"""
def __init__(self, property_name, serialiser: Serialiser, blob_backend: AzureTableBlobBackend, blob_prefix: str = None,
property_boolean_blob_status_name: str = None, max_workers=None):
"""
:param property_name: name of property in table
:param serialiser:
:param property_boolean_blob_status_name: name of property representing a boolean flag within a table, which indicates, if value is
blob backed. If None, each value is assumed to be blob backed.
:param blob_backend: actual backend to use for storage
:param blob_prefix: prefix to use for blob in storage, e.g. a table name
:param max_workers: maximal number of workers to load data from blob storage
"""
SerialisedPropertyLoader.__init__(self, property_name, serialiser)
BlobBackedPropertyLoader.__init__(self, property_name, blob_backend, blob_prefix, property_boolean_blob_status_name, max_workers)
[docs] def load_property_value(self, entity: Entity):
super(BlobBackedPropertyLoader, self).load_property_value(entity)
super(SerialisedPropertyLoader, self).load_property_value(entity)
[docs] def write_property_value(self, entity: Entity):
super(SerialisedPropertyLoader, self).write_property_value(entity)
super(BlobBackedPropertyLoader, self).write_property_value(entity)
[docs] def load_property_value_to_data_frame_column(self, df: pd.DataFrame):
super(BlobBackedPropertyLoader, self).load_property_value_to_data_frame_column(df)
super(SerialisedPropertyLoader, self).load_property_value_to_data_frame_column(df)
[docs]class AzureLazyBatchCommitTable:
"""
Wrapper for an Azure table, which allow for convenient insertion via lazy batch execution per partition.
Uses a priority queue to manage order of partitions to be committed.
To execute insertions, call :func:`LazyBatchCommitTable.commit`
"""
AZURE_ALLOWED_TABLE_NAME_PATTERN = re.compile("^[A-Za-z][A-Za-z0-9]{2,62}$")
AZURE_ALLOWED_TABLE_BATCH_SIZE = 100
[docs] class PartitionCommandsPriorityQueue:
[docs] class PartitionCommands:
def __init__(self, partition_key):
self.partition_key = partition_key
self._command_list = collections.deque()
def __len__(self):
return len(self._command_list)
[docs] def append(self, command):
self._command_list.append(command)
[docs] def execute(self, context_manager: Callable[[], TableBatch], batch_size: int):
while len(self._command_list) > 0:
_slice = [self._command_list.popleft() for _ in range(min(batch_size, len(self._command_list)))]
_log.info(f"Committing {len(_slice)} cache entries to the database")
with context_manager() as batch:
for command in _slice:
command(batch)
def __init__(self):
self.partition_commands_queue = []
self.partition_key2_commands = {}
self._thread_lock = threading.Lock()
[docs] def add_command(self, partition_key, command: Union[Callable[[TableBatch], Any], functools.partial[TableBatch]]):
"""
Add a command to queue of corresponding partitionKey
:param partition_key:
:param command: a callable on a TableBatch
"""
with self._thread_lock:
if partition_key not in self.partition_key2_commands:
commands = self.PartitionCommands(partition_key)
self.partition_commands_queue.append(commands)
self.partition_key2_commands[partition_key] = commands
self.partition_key2_commands[partition_key].append(command)
[docs] def pop(self, min_length: int = None) -> Optional[AzureLazyBatchCommitTable.PartitionCommandsPriorityQueue.PartitionCommands]:
"""
:param min_length: minimal length of largest PartitionCommands for the pop to take place.
:return: largest PartitionCommands or None if minimal length is not reached
"""
with self._thread_lock:
return self._pop(min_length)
[docs] def pop_all(self):
with self._thread_lock:
commands_list = []
while not self._is_empty():
commands_list.append(self._pop())
return commands_list
[docs] def is_empty(self):
with self._thread_lock:
return self._is_empty()
def _pop(self, min_length=None):
length, index = self._get_max_priority_info()
if index is not None and (min_length is None or length >= min_length):
q = self.partition_commands_queue.pop(index)
del self.partition_key2_commands[q.partition_key]
return q
else:
return None
def _is_empty(self):
return len(self.partition_commands_queue) == 0
def _get_max_priority_info(self):
lengths_list = list(map(len, self.partition_commands_queue))
if len(lengths_list) == 0:
return 0, None
max_length = max(lengths_list)
return max_length, lengths_list.index(max_length)
def __init__(self, table_name: str, table_service: TableService, property_loaders: Sequence[PropertyLoader] = ()):
"""
:param table_name: name of table
:param table_service: instance of :class:`azure.storage.table.TableService` to connect to Azure table storage
:param property_loaders:
"""
if not self.AZURE_ALLOWED_TABLE_NAME_PATTERN.match(table_name):
raise ValueError(f"Invalid table name {table_name}, see: "
f"https://docs.microsoft.com/en-us/rest/api/storageservices/Understanding-the-Table-Service-Data-Model")
self.table_service = table_service
self.table_name = table_name
self.property_loaders = property_loaders
self._partition_queues = self.PartitionCommandsPriorityQueue()
self._context_manager = functools.partial(self.table_service.batch, self.table_name)
if not self.exists():
self.table_service.create_table(self.table_name)
[docs] def insert_or_replace_entity(self, entity: Union[Dict, Entity]):
"""
Lazy wrapper method for :func:`azure.storage.table.TableService.insert_or_replace_entity`
:param entity:
"""
partition_key = entity["PartitionKey"]
for property_loader in self.property_loaders:
property_loader.write_property_value(entity)
execution_command = functools.partial(self._insert_or_replace_entity_via_batch, entity)
self._partition_queues.add_command(partition_key, execution_command)
[docs] def insert_entity(self, entity: Union[Dict, Entity]):
"""
Lazy wrapper method for :func:`azure.storage.table.TableService.insert_entity`
:param entity:
"""
partition_key = entity["PartitionKey"]
for property_loader in self.property_loaders:
property_loader.write_property_value(entity)
execution_command = functools.partial(self._insert_entity_via_batch, entity)
self._partition_queues.add_command(partition_key, execution_command)
[docs] def get_entity(self, partition_key: str, row_key: str) -> Optional[Entity]:
"""
Wraps :func:`azure.storage.table.TableService.get_entity`
:param partition_key:
:param row_key:
:return:
"""
try:
entity = self.table_service.get_entity(self.table_name, partition_key, row_key)
for property_loader in self.property_loaders:
property_loader.load_property_value(entity)
return entity
except Exception as e:
_log.debug(f"Unable to load value for partitionKey {partition_key} and rowKey {row_key} from table {self.table_name}: {e}")
return None
[docs] def commit_blocking_until_empty(self, max_batch_size=AZURE_ALLOWED_TABLE_BATCH_SIZE):
"""
Commit insertion commands. Commands are executed batch-wise per partition until partition queue is empty in a
blocking manner.
:param max_batch_size: maximal batch size to use for batch insertion, must be less or equal to batch size allowed by Azure
"""
max_batch_size = self._validate_max_batch_size(max_batch_size)
while not self._partition_queues.is_empty():
commands = self._partition_queues.pop()
commands.execute(self._context_manager, max_batch_size)
[docs] def commit_non_blocking_current_queue_state(self, max_batch_size=AZURE_ALLOWED_TABLE_BATCH_SIZE):
"""
Commit insertion commands. Empties the current PartitionCommandsQueue in a non blocking way.
Commands are executed batch-wise per partition.
:param max_batch_size: maximal batch size to use for batch insertion, must be less or equal to batch size allowed by Azure
"""
max_batch_size = self._validate_max_batch_size(max_batch_size)
def commit():
commands_list = self._partition_queues.pop_all()
for commands in commands_list:
commands.execute(self._context_manager, max_batch_size)
thread = threading.Thread(target=commit, daemon=False)
thread.start()
[docs] def commit_blocking_largest_partition_from_queue(self, max_batch_size=AZURE_ALLOWED_TABLE_BATCH_SIZE, min_length=None):
"""
Commits in a blocking way the largest partition from PartitionCommandsQueue
:param max_batch_size: maximal batch size to use for batch insertion, must be less or equal to batch size allowed by Azure
:param min_length: minimal size of largest partition. If not None, pop and commit only if minLength is reached.
:return:
"""
max_batch_size = self._validate_max_batch_size(max_batch_size)
commands = self._partition_queues.pop(min_length)
if commands is not None:
commands.execute(self._context_manager, max_batch_size)
def _validate_max_batch_size(self, max_batch_size):
if max_batch_size > self.AZURE_ALLOWED_TABLE_BATCH_SIZE:
_log.warning(f"Provided maxBatchSize is larger than allowed size {self.AZURE_ALLOWED_TABLE_BATCH_SIZE}. "
f"Will use maxBatchSize {self.AZURE_ALLOWED_TABLE_BATCH_SIZE} instead.")
max_batch_size = self.AZURE_ALLOWED_TABLE_BATCH_SIZE
return max_batch_size
[docs] def load_table_to_data_frame(self, columns: List[str] = None, row_filter_query: str = None, num_records: int = None):
"""
Load all rows of table to :class:`~pandas.DataFrame`
:param row_filter_query:
:param num_records:
:param columns: restrict loading to provided columns
:return: :class:`~pandas.DataFrame`
"""
if num_records is None:
records = list(self._iter_records(columns, row_filter_query))
else:
records = []
for record in self._iter_records(columns, row_filter_query):
records.append(record)
if len(records) >= num_records:
break
df = pd.DataFrame(records, columns=columns)
for property_loader in self.property_loaders:
property_loader.load_property_value_to_data_frame_column(df)
return df
[docs] def iter_data_frame_chunks(self, chunk_size: int, columns: List[str] = None, row_filter_query: str = None):
"""
Get a generator of dataframe chunks
:param row_filter_query:
:param chunk_size:
:param columns:
:return:
"""
records = []
for record in self._iter_records(columns, row_filter_query):
records.append(record)
if len(records) >= chunk_size:
df = pd.DataFrame(records, columns=columns)
for propertyLoader in self.property_loaders:
propertyLoader.load_property_value_to_data_frame_column(df)
yield df
records = []
[docs] def iter_records(self, columns: List[str] = None, row_filter_query: str = None):
"""
Get a generator of table entities
:param row_filter_query:
:param columns:
:return:
"""
for entity in self._iter_records(columns, row_filter_query):
for propertyLoader in self.property_loaders:
propertyLoader.load_property_value(entity)
yield entity
def _iter_records(self, columns: Optional[List[str]], row_filter_query: Optional[str]):
column_names_as_comma_separated_string = None
if columns is not None:
column_names_as_comma_separated_string = ",".join(columns)
return self.table_service.query_entities(self.table_name, select=column_names_as_comma_separated_string,
filter=row_filter_query)
[docs] def insert_data_frame_to_table(self, df: pd.DataFrame, partition_key_generator: Callable[[str], str] = None, num_records: int = None):
"""
Inserts or replace entities of the table corresponding to rows of the DataFrame, where the index of the dataFrame acts as rowKey.
Values of object type columns in the dataFrame may have to be serialised via json beforehand.
:param df: DataFrame to be inserted
:param partition_key_generator: if None, partitionKeys default to tableName
:param num_records: restrict insertion to first numRecords rows, merely for testing
"""
for (count, (idx, row)) in enumerate(df.iterrows()):
if num_records is not None:
if count >= num_records:
break
entity = row.to_dict()
entity["RowKey"] = idx
entity["PartitionKey"] = self.table_name if partition_key_generator is None else partition_key_generator(idx)
self.insert_or_replace_entity(entity)
@staticmethod
def _insert_or_replace_entity_via_batch(entity, batch: TableBatch):
return batch.insert_or_replace_entity(entity)
@staticmethod
def _insert_entity_via_batch(entity, batch: TableBatch):
return batch.insert_entity(entity)
[docs] def exists(self):
return self.table_service.exists(self.table_name)
[docs]class AzureTablePersistentKeyValueCache(PersistentKeyValueCache):
"""
PersistentKeyValueCache using Azure Table Storage, see https://docs.microsoft.com/en-gb/azure/storage/tables/
"""
CACHE_VALUE_IDENTIFIER = "cache_value"
def __init__(self, table_service: TableService, table_name="cache", partition_key_generator: Callable[[str], str] = None,
max_batch_size=100, min_size_for_periodic_commit: Optional[int] = 100, deferred_commit_delay_secs=1.0, in_memory=False,
blob_backend: AzureTableBlobBackend = None, serialiser: Serialiser = None, max_workers: int = None):
"""
:param table_service: https://docs.microsoft.com/en-us/python/api/azure-cosmosdb-table/azure.cosmosdb.table.tableservice.tableservice?view=azure-python
:param table_name: name of table, needs to match restrictions for Azure storage resources, see https://docs.microsoft.com/en-gb/azure/azure-resource-manager/management/resource-name-rules
:param partition_key_generator: callable to generate a partitionKey from provided string, if None partitionKey in requests defaults
to tableName
:param max_batch_size: maximal batch size for each commit.
:param deferred_commit_delay_secs: the time frame during which no new data must be added for a pending transaction to be committed
:param min_size_for_periodic_commit: minimal size of a batch to be committed in a periodic thread.
If None, commits are only executed in a deferred manner, i.e. commit only if there is no update for `deferred_commit_delay_secs`
:param in_memory: boolean flag, to indicate, if table should be loaded in memory at construction
:param blob_backend: if not None, blob storage will be used to store actual value and cache_value in table only contains a reference
:param max_workers: maximal number of workers to load data from blob backend
"""
self._deferredCommitDelaySecs = deferred_commit_delay_secs
self._partitionKeyGenerator = partition_key_generator
def create_property_loaders():
if blob_backend is None and serialiser is None:
_property_loaders = ()
elif blob_backend is None and serialiser is not None:
_property_loaders = (SerialisedPropertyLoader(self.CACHE_VALUE_IDENTIFIER, serialiser),)
elif blob_backend is not None and serialiser is None:
property_blob_status_name = self.CACHE_VALUE_IDENTIFIER + "_blob_backed"
_property_loaders = (BlobBackedPropertyLoader(self.CACHE_VALUE_IDENTIFIER, blob_backend, table_name,
property_blob_status_name, max_workers),)
else:
property_blob_status_name = self.CACHE_VALUE_IDENTIFIER + "_blob_backed"
_property_loaders = (BlobBackedSerialisedPropertyLoader(self.CACHE_VALUE_IDENTIFIER, serialiser, blob_backend,
table_name, property_blob_status_name, max_workers),)
return _property_loaders
property_loaders = create_property_loaders()
self._batch_commit_table = AzureLazyBatchCommitTable(table_name, table_service, property_loaders=property_loaders)
self._minSizeForPeriodicCommit = min_size_for_periodic_commit
self._maxBatchSize = max_batch_size
self._updateHook = PeriodicUpdateHook(deferred_commit_delay_secs, no_update_fn=self._commit, periodic_fn=self._periodically_commit)
self._in_memory_cache = None
if in_memory:
df = self._batch_commit_table.load_table_to_data_frame(columns=['RowKey', self.CACHE_VALUE_IDENTIFIER]).set_index("RowKey")
_log.info(f"Loaded {len(df)} entries of table {table_name} in memory")
self._in_memory_cache = df[self.CACHE_VALUE_IDENTIFIER].to_dict()
[docs] def set(self, key, value):
key_as_string = str(key)
partition_key = self._get_partition_key_for_row_key(key_as_string)
entity = {'PartitionKey': partition_key, 'RowKey': key_as_string, self.CACHE_VALUE_IDENTIFIER: value}
self._batch_commit_table.insert_or_replace_entity(entity)
self._updateHook.handle_update()
if self._in_memory_cache is not None:
self._in_memory_cache[key_as_string] = value
[docs] def get(self, key):
key_as_string = str(key)
value = self._get_from_in_memory_cache(key_as_string)
if value is None:
value = self._get_from_table(key_as_string)
return value
def _get_from_table(self, key: str):
partition_key = self._get_partition_key_for_row_key(key)
entity = self._batch_commit_table.get_entity(partition_key, key)
if entity is not None:
return entity[self.CACHE_VALUE_IDENTIFIER]
return None
def _get_from_in_memory_cache(self, key):
if self._in_memory_cache is None:
return None
return self._in_memory_cache.get(str(key), None)
def _get_partition_key_for_row_key(self, key: str):
return self._batch_commit_table.table_name if self._partitionKeyGenerator is None else self._partitionKeyGenerator(key)
def _commit(self):
self._batch_commit_table.commit_non_blocking_current_queue_state(self._maxBatchSize)
def _periodically_commit(self):
self._batch_commit_table.commit_blocking_largest_partition_from_queue(self._maxBatchSize, self._minSizeForPeriodicCommit)