cache_azure#
Source code: sensai/util/cache_azure.py
- class Serialiser[source]#
Bases:
ABC
Abstraction for mechanisms to serialise values, which do not fit table storage data model, see https://docs.microsoft.com/en-us/rest/api/storageservices/understanding-the-table-service-data-model
- class NumpyArrayJsonSerialiser[source]#
Bases:
Serialiser
Serialises a numpy array as json string of list representation of array
- class SerialisedPropertyLoader(property_name: str, serialiser: Serialiser)[source]#
Bases:
PropertyLoader
PropertyLoader to serialise and de-serialise values. Useful, if type of values is not aligned with table storage data model, see https://docs.microsoft.com/en-us/rest/api/storageservices/understanding-the-table-service-data-model
- class AzureTableBlobBackend[source]#
Bases:
ABC
Abstraction of a blob backend, which allows for convenient setting and getting of values stored in blob storage via a reference to the value
- class BlobPerKeyAzureTableBlobBackend(block_blob_service: azure.storage.blob.BlockBlobService, container_name: str)[source]#
Bases:
AzureTableBlobBackend
,ABC
Backend stores serialised values as /tableName/partitionKey/rowKey/valueName.<fileExtension> or /tableName/rowKey/valueName.<fileExtension>, if partitionKey equals tableName
- Parameters:
block_blob_service – https://docs.microsoft.com/en-us/python/api/azure-storage-blob/azure.storage.blob.blockblobservice.blockblobservice?view=azure-python-previous
- abstract property file_extension#
- class TextDumpAzureTableBlobBackend(block_blob_service: azure.storage.blob.BlockBlobService, container_name: str)[source]#
Bases:
BlobPerKeyAzureTableBlobBackend
Backend stores values as txt files in the structure /tableName/partitionKey/rowKey/valueName
- Parameters:
block_blob_service – https://docs.microsoft.com/en-us/python/api/azure-storage-blob/azure.storage.blob.blockblobservice.blockblobservice?view=azure-python-previous
- property file_extension#
- class JsonAzureTableBlobBackend(block_blob_service: azure.storage.blob.BlockBlobService, container_name: str)[source]#
Bases:
BlobPerKeyAzureTableBlobBackend
Backend stores values as json files in the structure /tableName/partitionKey/rowKey/valueName.json
- Parameters:
block_blob_service – https://docs.microsoft.com/en-us/python/api/azure-storage-blob/azure.storage.blob.blockblobservice.blockblobservice?view=azure-python-previous
- property file_extension#
- class PickleAzureTableBlobBackend(block_blob_service: azure.storage.blob.BlockBlobService, container_name: str)[source]#
Bases:
JsonAzureTableBlobBackend
Backend stores values as pickle files in the structure /tableName/partitionKey/rowKey/valueName.pickle
- Parameters:
block_blob_service – https://docs.microsoft.com/en-us/python/api/azure-storage-blob/azure.storage.blob.blockblobservice.blockblobservice?view=azure-python-previous
- property file_extension#
- class BlobBackedPropertyLoader(property_name: str, blob_backend: AzureTableBlobBackend, blob_prefix: Optional[str] = None, property_boolean_blob_status_name: Optional[str] = None, max_workers=None)[source]#
Bases:
PropertyLoader
- Parameters:
property_name – name of property in table
property_boolean_blob_status_name – name of property representing a boolean flag within a table, which indicates, if value is blob backed. If None, each value is assumed to be blob backed.
blob_backend – actual backend to use for storage
blob_prefix – prefix to use for blob in storage, e.g. a table name
max_workers – maximal number of workers to load data from blob storage
- AZURE_ALLOWED_SIZE_PER_PROPERTY_BYTES = 64000#
- AZURE_ALLOWED_STRING_LENGTH_PER_PROPERTY = 32000#
PropertyLoader to write and read values from blob backend via a reference to the value. Useful, if values cannot be stored in table storage itself, due to not being aligned with table storage data model, see https://docs.microsoft.com/en-us/rest/api/storageservices/understanding-the-table-service-data-model
- class BlobBackedSerialisedPropertyLoader(property_name, serialiser: Serialiser, blob_backend: AzureTableBlobBackend, blob_prefix: Optional[str] = None, property_boolean_blob_status_name: Optional[str] = None, max_workers=None)[source]#
Bases:
BlobBackedPropertyLoader
,SerialisedPropertyLoader
Property loader, which combines serialisation and blob backing.
- Parameters:
property_name – name of property in table
serialiser –
property_boolean_blob_status_name – name of property representing a boolean flag within a table, which indicates, if value is blob backed. If None, each value is assumed to be blob backed.
blob_backend – actual backend to use for storage
blob_prefix – prefix to use for blob in storage, e.g. a table name
max_workers – maximal number of workers to load data from blob storage
- class AzureLazyBatchCommitTable(table_name: str, table_service: azure.storage.table.TableService, property_loaders: Sequence[PropertyLoader] = ())[source]#
Bases:
object
Wrapper for an Azure table, which allow for convenient insertion via lazy batch execution per partition. Uses a priority queue to manage order of partitions to be committed. To execute insertions, call
LazyBatchCommitTable.commit()
- Parameters:
table_name – name of table
table_service – instance of
azure.storage.table.TableService
to connect to Azure table storageproperty_loaders –
- AZURE_ALLOWED_TABLE_NAME_PATTERN = re.compile('^[A-Za-z][A-Za-z0-9]{2,62}$')#
- AZURE_ALLOWED_TABLE_BATCH_SIZE = 100#
- class PartitionCommandsPriorityQueue[source]#
Bases:
object
- add_command(partition_key, command: Union[Callable[[TableBatch], Any], functools.partial[TableBatch]])[source]#
Add a command to queue of corresponding partitionKey :param partition_key: :param command: a callable on a TableBatch
- pop(min_length: Optional[int] = None) Optional[PartitionCommands] [source]#
- Parameters:
min_length – minimal length of largest PartitionCommands for the pop to take place.
- Returns:
largest PartitionCommands or None if minimal length is not reached
- insert_or_replace_entity(entity: Union[Dict, azure.storage.table.Entity])[source]#
Lazy wrapper method for
azure.storage.table.TableService.insert_or_replace_entity()
:param entity:
- insert_entity(entity: Union[Dict, azure.storage.table.Entity])[source]#
Lazy wrapper method for
azure.storage.table.TableService.insert_entity()
:param entity:
- get_entity(partition_key: str, row_key: str) Optional[azure.storage.table.Entity] [source]#
Wraps
azure.storage.table.TableService.get_entity()
:param partition_key: :param row_key: :return:
- commit_blocking_until_empty(max_batch_size=100)[source]#
Commit insertion commands. Commands are executed batch-wise per partition until partition queue is empty in a blocking manner. :param max_batch_size: maximal batch size to use for batch insertion, must be less or equal to batch size allowed by Azure
- commit_non_blocking_current_queue_state(max_batch_size=100)[source]#
Commit insertion commands. Empties the current PartitionCommandsQueue in a non blocking way. Commands are executed batch-wise per partition. :param max_batch_size: maximal batch size to use for batch insertion, must be less or equal to batch size allowed by Azure
- commit_blocking_largest_partition_from_queue(max_batch_size=100, min_length=None)[source]#
Commits in a blocking way the largest partition from PartitionCommandsQueue :param max_batch_size: maximal batch size to use for batch insertion, must be less or equal to batch size allowed by Azure :param min_length: minimal size of largest partition. If not None, pop and commit only if minLength is reached. :return:
- load_table_to_data_frame(columns: Optional[List[str]] = None, row_filter_query: Optional[str] = None, num_records: Optional[int] = None)[source]#
Load all rows of table to
DataFrame
:param row_filter_query: :param num_records: :param columns: restrict loading to provided columns :return:DataFrame
- iter_data_frame_chunks(chunk_size: int, columns: Optional[List[str]] = None, row_filter_query: Optional[str] = None)[source]#
Get a generator of dataframe chunks :param row_filter_query: :param chunk_size: :param columns: :return:
- iter_records(columns: Optional[List[str]] = None, row_filter_query: Optional[str] = None)[source]#
Get a generator of table entities :param row_filter_query: :param columns: :return:
- insert_data_frame_to_table(df: DataFrame, partition_key_generator: Optional[Callable[[str], str]] = None, num_records: Optional[int] = None)[source]#
Inserts or replace entities of the table corresponding to rows of the DataFrame, where the index of the dataFrame acts as rowKey. Values of object type columns in the dataFrame may have to be serialised via json beforehand. :param df: DataFrame to be inserted :param partition_key_generator: if None, partitionKeys default to tableName :param num_records: restrict insertion to first numRecords rows, merely for testing
- class AzureTablePersistentKeyValueCache(table_service: azure.storage.table.TableService, table_name='cache', partition_key_generator: Optional[Callable[[str], str]] = None, max_batch_size=100, min_size_for_periodic_commit: Optional[int] = 100, deferred_commit_delay_secs=1.0, in_memory=False, blob_backend: Optional[AzureTableBlobBackend] = None, serialiser: Optional[Serialiser] = None, max_workers: Optional[int] = None)[source]#
Bases:
PersistentKeyValueCache
PersistentKeyValueCache using Azure Table Storage, see https://docs.microsoft.com/en-gb/azure/storage/tables/
- Parameters:
table_service – https://docs.microsoft.com/en-us/python/api/azure-cosmosdb-table/azure.cosmosdb.table.tableservice.tableservice?view=azure-python
table_name – name of table, needs to match restrictions for Azure storage resources, see https://docs.microsoft.com/en-gb/azure/azure-resource-manager/management/resource-name-rules
partition_key_generator – callable to generate a partitionKey from provided string, if None partitionKey in requests defaults to tableName
max_batch_size – maximal batch size for each commit.
deferred_commit_delay_secs – the time frame during which no new data must be added for a pending transaction to be committed
min_size_for_periodic_commit – minimal size of a batch to be committed in a periodic thread. If None, commits are only executed in a deferred manner, i.e. commit only if there is no update for deferred_commit_delay_secs
in_memory – boolean flag, to indicate, if table should be loaded in memory at construction
blob_backend – if not None, blob storage will be used to store actual value and cache_value in table only contains a reference
max_workers – maximal number of workers to load data from blob backend
- CACHE_VALUE_IDENTIFIER = 'cache_value'#