Mutations Batching
User friendly container for Google Cloud Bigtable MutationBatcher.
exception google.cloud.bigtable.batcher.MutationsBatchError(message, exc)
Bases: Exception
Error in the batch request
class google.cloud.bigtable.batcher.MutationsBatcher(table, flush_count=100, max_row_bytes=20971520, flush_interval=1, batch_completed_callback=None)
Bases: object
A MutationsBatcher is used in batch cases where the number of mutations
is large or unknown. It will store DirectRow
in memory until one of the
size limits is reached, or an explicit call to flush()
is performed. When
a flush event occurs, the DirectRow
in memory will be sent to Cloud
Bigtable. Batching mutations is more efficient than sending individual
request.
This class is not suited for usage in systems where each mutation
must be guaranteed to be sent, since calling mutate may only result in an
in-memory change. In a case of a system crash, any DirectRow
remaining in
memory will not necessarily be sent to the service, even after the
completion of the mutate()
method.
Note on thread safety: The same MutationBatcher
cannot be shared by multiple end-user threads.
Parameters
table (class) – class:~google.cloud.bigtable.table.Table.
flush_count (int) – (Optional) Max number of rows to flush. If it reaches the max number of rows it calls finish_batch() to mutate the current row batch. Default is FLUSH_COUNT (1000 rows).
max_row_bytes (int) – (Optional) Max number of row mutations size to flush. If it reaches the max number of row mutations size it calls finish_batch() to mutate the current row batch. Default is MAX_ROW_BYTES (5 MB).
flush_interval (float) – (Optional) The interval (in seconds) between asynchronous flush. Default is 1 second.
batch_completed_callback (Callable[list:[~google.rpc.status_pb2.Status]] = None) – (Optional) A callable for handling responses after the current batch is sent. The callable function expect a list of grpc Status.
_enter_()
Starting the MutationsBatcher as a context manager
_exit_(exc_type, exc_value, exc_traceback)
Clean up resources. Flush and shutdown the ThreadPoolExecutor.
close()
Clean up resources. Flush and shutdown the ThreadPoolExecutor. Any errors will be raised.
Raises
batcherMutationsBatchError
if there’s any error in the mutations.
flush()
Sends the current batch to Cloud Bigtable synchronously. For example:
from google.cloud.bigtable import Client
client = Client(admin=True)
instance = client.instance(INSTANCE_ID)
table = instance.table(TABLE_ID)
# Batcher for max row bytes, max_row_bytes=1024 is optional.
batcher = table.mutations_batcher(max_row_bytes=1024)
# Add a single row
row_key = b"row_key"
row = table.row(row_key)
row.set_cell(COLUMN_FAMILY_ID, COL_NAME1, "value-0")
# In batcher, mutate will flush current batch if it
# reaches the max_row_bytes
batcher.mutate(row)
batcher.flush()
Raises
batcherMutationsBatchError
if there’s any error in the mutations.
mutate(row)
Add a row to the batch. If the current batch meets one of the size limits, the batch is sent asynchronously.
For example:
from google.cloud.bigtable import Client
client = Client(admin=True)
instance = client.instance(INSTANCE_ID)
table = instance.table(TABLE_ID)
# Batcher for max row bytes, max_row_bytes=1024 is optional.
batcher = table.mutations_batcher(max_row_bytes=1024)
# Add a single row
row_key = b"row_key_1"
row = table.row(row_key)
row.set_cell(
COLUMN_FAMILY_ID, COL_NAME1, "value-0", timestamp=datetime.datetime.utcnow()
)
# In batcher, mutate will flush current batch if it
# reaches the max_row_bytes
batcher.mutate(row)
batcher.flush()
Parameters
row (class) –
DirectRow
.Raises
One of the following: *
_BigtableRetryableError
if any row returned a transient error. *RuntimeError
if the number of responses doesn’t match the number of rows that were retried
mutate_rows(rows)
Add multiple rows to the batch. If the current batch meets one of the size limits, the batch is sent asynchronously.
For example:
from google.cloud.bigtable import Client
client = Client(admin=True)
instance = client.instance(INSTANCE_ID)
table = instance.table(TABLE_ID)
batcher = table.mutations_batcher()
row1 = table.row(b"row_key_1")
row2 = table.row(b"row_key_2")
row3 = table.row(b"row_key_3")
row4 = table.row(b"row_key_4")
row1.set_cell(COLUMN_FAMILY_ID, COL_NAME1, b"cell-val1")
row2.set_cell(COLUMN_FAMILY_ID, COL_NAME1, b"cell-val2")
row3.set_cell(COLUMN_FAMILY_ID, COL_NAME1, b"cell-val3")
row4.set_cell(COLUMN_FAMILY_ID, COL_NAME1, b"cell-val4")
batcher.mutate_rows([row1, row2, row3, row4])
# batcher will flush current batch if it
# reaches the max flush_count
# Manually send the current batch to Cloud Bigtable
batcher.flush()
Parameters
rows (list:[~google.cloud.bigtable.row.DirectRow]) – list:[~google.cloud.bigtable.row.DirectRow].
Raises
One of the following: *
_BigtableRetryableError
if any row returned a transient error. *RuntimeError
if the number of responses doesn’t match the number of rows that were retried