cache_azure#


class Serialiser[source]#

Bases: ABC

Abstraction for mechanisms to serialise values, which do not fit table storage data model, see https://docs.microsoft.com/en-us/rest/api/storageservices/understanding-the-table-service-data-model

abstract serialise(value) str[source]#
abstract deserialise(value: str)[source]#
class NumpyArrayJsonSerialiser[source]#

Bases: Serialiser

Serialises a numpy array as json string of list representation of array

serialise(value: ndarray) str[source]#
deserialise(value: str)[source]#
class PropertyLoader[source]#

Bases: ABC

Abstraction of a customised loader for an entity property

abstract load_property_value(entity: azure.storage.table.Entity)[source]#
abstract write_property_value(entity: azure.storage.table.Entity)[source]#
abstract load_property_value_to_data_frame_column(df: DataFrame)[source]#
class SerialisedPropertyLoader(property_name: str, serialiser: Serialiser)[source]#

Bases: PropertyLoader

PropertyLoader to serialise and de-serialise values. Useful, if type of values is not aligned with table storage data model, see https://docs.microsoft.com/en-us/rest/api/storageservices/understanding-the-table-service-data-model

load_property_value(entity: azure.storage.table.Entity)[source]#
write_property_value(entity: azure.storage.table.Entity)[source]#
load_property_value_to_data_frame_column(df: DataFrame)[source]#
class AzureTableBlobBackend[source]#

Bases: ABC

Abstraction of a blob backend, which allows for convenient setting and getting of values stored in blob storage via a reference to the value

abstract get_value_from_reference(value_identifier: str)[source]#
abstract get_value_reference(partition_key: str, row_key: str, value_name: str, blob_name_prefix: Optional[str] = None) str[source]#
abstract set_value_for_reference(value_identifier: str, value)[source]#
class BlobPerKeyAzureTableBlobBackend(block_blob_service: azure.storage.blob.BlockBlobService, container_name: str)[source]#

Bases: AzureTableBlobBackend, ABC

Backend stores serialised values as /tableName/partitionKey/rowKey/valueName.<fileExtension> or /tableName/rowKey/valueName.<fileExtension>, if partitionKey equals tableName

Parameters:

block_blob_servicehttps://docs.microsoft.com/en-us/python/api/azure-storage-blob/azure.storage.blob.blockblobservice.blockblobservice?view=azure-python-previous

abstract property file_extension#
get_value_from_reference(value_identifier: str)[source]#
get_value_reference(partition_key: str, row_key: str, value_name: str, blob_name_prefix: Optional[str] = None) str[source]#
set_value_for_reference(value_identifier: str, value)[source]#
class TextDumpAzureTableBlobBackend(block_blob_service: azure.storage.blob.BlockBlobService, container_name: str)[source]#

Bases: BlobPerKeyAzureTableBlobBackend

Backend stores values as txt files in the structure /tableName/partitionKey/rowKey/valueName

Parameters:

block_blob_servicehttps://docs.microsoft.com/en-us/python/api/azure-storage-blob/azure.storage.blob.blockblobservice.blockblobservice?view=azure-python-previous

property file_extension#
class JsonAzureTableBlobBackend(block_blob_service: azure.storage.blob.BlockBlobService, container_name: str)[source]#

Bases: BlobPerKeyAzureTableBlobBackend

Backend stores values as json files in the structure /tableName/partitionKey/rowKey/valueName.json

Parameters:

block_blob_servicehttps://docs.microsoft.com/en-us/python/api/azure-storage-blob/azure.storage.blob.blockblobservice.blockblobservice?view=azure-python-previous

property file_extension#
class PickleAzureTableBlobBackend(block_blob_service: azure.storage.blob.BlockBlobService, container_name: str)[source]#

Bases: JsonAzureTableBlobBackend

Backend stores values as pickle files in the structure /tableName/partitionKey/rowKey/valueName.pickle

Parameters:

block_blob_servicehttps://docs.microsoft.com/en-us/python/api/azure-storage-blob/azure.storage.blob.blockblobservice.blockblobservice?view=azure-python-previous

property file_extension#
class BlobBackedPropertyLoader(property_name: str, blob_backend: AzureTableBlobBackend, blob_prefix: Optional[str] = None, property_boolean_blob_status_name: Optional[str] = None, max_workers=None)[source]#

Bases: PropertyLoader

Parameters:
  • property_name – name of property in table

  • property_boolean_blob_status_name – name of property representing a boolean flag within a table, which indicates, if value is blob backed. If None, each value is assumed to be blob backed.

  • blob_backend – actual backend to use for storage

  • blob_prefix – prefix to use for blob in storage, e.g. a table name

  • max_workers – maximal number of workers to load data from blob storage

AZURE_ALLOWED_SIZE_PER_PROPERTY_BYTES = 64000#
AZURE_ALLOWED_STRING_LENGTH_PER_PROPERTY = 32000#

PropertyLoader to write and read values from blob backend via a reference to the value. Useful, if values cannot be stored in table storage itself, due to not being aligned with table storage data model, see https://docs.microsoft.com/en-us/rest/api/storageservices/understanding-the-table-service-data-model

load_property_value(entity: azure.storage.table.Entity)[source]#
write_property_value(entity: azure.storage.table.Entity)[source]#
load_property_value_to_data_frame_column(df: DataFrame)[source]#
class BlobBackedSerialisedPropertyLoader(property_name, serialiser: Serialiser, blob_backend: AzureTableBlobBackend, blob_prefix: Optional[str] = None, property_boolean_blob_status_name: Optional[str] = None, max_workers=None)[source]#

Bases: BlobBackedPropertyLoader, SerialisedPropertyLoader

Property loader, which combines serialisation and blob backing.

Parameters:
  • property_name – name of property in table

  • serialiser

  • property_boolean_blob_status_name – name of property representing a boolean flag within a table, which indicates, if value is blob backed. If None, each value is assumed to be blob backed.

  • blob_backend – actual backend to use for storage

  • blob_prefix – prefix to use for blob in storage, e.g. a table name

  • max_workers – maximal number of workers to load data from blob storage

load_property_value(entity: azure.storage.table.Entity)[source]#
write_property_value(entity: azure.storage.table.Entity)[source]#
load_property_value_to_data_frame_column(df: DataFrame)[source]#
class AzureLazyBatchCommitTable(table_name: str, table_service: azure.storage.table.TableService, property_loaders: Sequence[PropertyLoader] = ())[source]#

Bases: object

Wrapper for an Azure table, which allow for convenient insertion via lazy batch execution per partition. Uses a priority queue to manage order of partitions to be committed. To execute insertions, call LazyBatchCommitTable.commit()

Parameters:
  • table_name – name of table

  • table_service – instance of azure.storage.table.TableService to connect to Azure table storage

  • property_loaders

AZURE_ALLOWED_TABLE_NAME_PATTERN = re.compile('^[A-Za-z][A-Za-z0-9]{2,62}$')#
AZURE_ALLOWED_TABLE_BATCH_SIZE = 100#
class PartitionCommandsPriorityQueue[source]#

Bases: object

class PartitionCommands(partition_key)[source]#

Bases: object

append(command)[source]#
execute(context_manager: Callable[[], azure.storage.table.TableBatch], batch_size: int)[source]#
add_command(partition_key, command: Union[Callable[[TableBatch], Any], functools.partial[TableBatch]])[source]#

Add a command to queue of corresponding partitionKey :param partition_key: :param command: a callable on a TableBatch

pop(min_length: Optional[int] = None) Optional[PartitionCommands][source]#
Parameters:

min_length – minimal length of largest PartitionCommands for the pop to take place.

Returns:

largest PartitionCommands or None if minimal length is not reached

pop_all()[source]#
is_empty()[source]#
insert_or_replace_entity(entity: Union[Dict, azure.storage.table.Entity])[source]#

Lazy wrapper method for azure.storage.table.TableService.insert_or_replace_entity() :param entity:

insert_entity(entity: Union[Dict, azure.storage.table.Entity])[source]#

Lazy wrapper method for azure.storage.table.TableService.insert_entity() :param entity:

get_entity(partition_key: str, row_key: str) Optional[azure.storage.table.Entity][source]#

Wraps azure.storage.table.TableService.get_entity() :param partition_key: :param row_key: :return:

commit_blocking_until_empty(max_batch_size=100)[source]#

Commit insertion commands. Commands are executed batch-wise per partition until partition queue is empty in a blocking manner. :param max_batch_size: maximal batch size to use for batch insertion, must be less or equal to batch size allowed by Azure

commit_non_blocking_current_queue_state(max_batch_size=100)[source]#

Commit insertion commands. Empties the current PartitionCommandsQueue in a non blocking way. Commands are executed batch-wise per partition. :param max_batch_size: maximal batch size to use for batch insertion, must be less or equal to batch size allowed by Azure

commit_blocking_largest_partition_from_queue(max_batch_size=100, min_length=None)[source]#

Commits in a blocking way the largest partition from PartitionCommandsQueue :param max_batch_size: maximal batch size to use for batch insertion, must be less or equal to batch size allowed by Azure :param min_length: minimal size of largest partition. If not None, pop and commit only if minLength is reached. :return:

load_table_to_data_frame(columns: Optional[List[str]] = None, row_filter_query: Optional[str] = None, num_records: Optional[int] = None)[source]#

Load all rows of table to DataFrame :param row_filter_query: :param num_records: :param columns: restrict loading to provided columns :return: DataFrame

iter_data_frame_chunks(chunk_size: int, columns: Optional[List[str]] = None, row_filter_query: Optional[str] = None)[source]#

Get a generator of dataframe chunks :param row_filter_query: :param chunk_size: :param columns: :return:

iter_records(columns: Optional[List[str]] = None, row_filter_query: Optional[str] = None)[source]#

Get a generator of table entities :param row_filter_query: :param columns: :return:

insert_data_frame_to_table(df: DataFrame, partition_key_generator: Optional[Callable[[str], str]] = None, num_records: Optional[int] = None)[source]#

Inserts or replace entities of the table corresponding to rows of the DataFrame, where the index of the dataFrame acts as rowKey. Values of object type columns in the dataFrame may have to be serialised via json beforehand. :param df: DataFrame to be inserted :param partition_key_generator: if None, partitionKeys default to tableName :param num_records: restrict insertion to first numRecords rows, merely for testing

exists()[source]#
class AzureTablePersistentKeyValueCache(table_service: azure.storage.table.TableService, table_name='cache', partition_key_generator: Optional[Callable[[str], str]] = None, max_batch_size=100, min_size_for_periodic_commit: Optional[int] = 100, deferred_commit_delay_secs=1.0, in_memory=False, blob_backend: Optional[AzureTableBlobBackend] = None, serialiser: Optional[Serialiser] = None, max_workers: Optional[int] = None)[source]#

Bases: PersistentKeyValueCache

PersistentKeyValueCache using Azure Table Storage, see https://docs.microsoft.com/en-gb/azure/storage/tables/

Parameters:
  • table_servicehttps://docs.microsoft.com/en-us/python/api/azure-cosmosdb-table/azure.cosmosdb.table.tableservice.tableservice?view=azure-python

  • table_name – name of table, needs to match restrictions for Azure storage resources, see https://docs.microsoft.com/en-gb/azure/azure-resource-manager/management/resource-name-rules

  • partition_key_generator – callable to generate a partitionKey from provided string, if None partitionKey in requests defaults to tableName

  • max_batch_size – maximal batch size for each commit.

  • deferred_commit_delay_secs – the time frame during which no new data must be added for a pending transaction to be committed

  • min_size_for_periodic_commit – minimal size of a batch to be committed in a periodic thread. If None, commits are only executed in a deferred manner, i.e. commit only if there is no update for deferred_commit_delay_secs

  • in_memory – boolean flag, to indicate, if table should be loaded in memory at construction

  • blob_backend – if not None, blob storage will be used to store actual value and cache_value in table only contains a reference

  • max_workers – maximal number of workers to load data from blob backend

CACHE_VALUE_IDENTIFIER = 'cache_value'#
set(key, value)[source]#

Sets a cached value

Parameters:
  • key – the key under which to store the value

  • value – the value to store; since None is used indicate the absence of a value, None should not be used a value

get(key)[source]#

Retrieves a cached value

Parameters:

key – the lookup key

Returns:

the cached value or None if no value is found