Skip to content

Commit f7095fa

Browse files
authored
feat: make flush size configurable (#1677)
feat: make flush size configurable
1 parent a0668ec commit f7095fa

File tree

3 files changed

+139
-9
lines changed

3 files changed

+139
-9
lines changed

google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
from typing import Optional, Union
2525

2626
from google_crc32c import Checksum
27+
from google.api_core import exceptions
2728

2829
from ._utils import raise_if_no_fast_crc32c
2930
from google.cloud import _storage_v2
@@ -36,7 +37,7 @@
3637

3738

3839
_MAX_CHUNK_SIZE_BYTES = 2 * 1024 * 1024 # 2 MiB
39-
_MAX_BUFFER_SIZE_BYTES = 16 * 1024 * 1024 # 16 MiB
40+
_DEFAULT_FLUSH_INTERVAL_BYTES = 16 * 1024 * 1024 # 16 MiB
4041

4142

4243
class AsyncAppendableObjectWriter:
@@ -49,6 +50,7 @@ def __init__(
4950
object_name: str,
5051
generation=None,
5152
write_handle=None,
53+
writer_options: Optional[dict] = None,
5254
):
5355
"""
5456
Class for appending data to a GCS Appendable Object.
@@ -125,6 +127,21 @@ def __init__(
125127
# Please note: `offset` and `persisted_size` are same when the stream is
126128
# opened.
127129
self.persisted_size: Optional[int] = None
130+
if writer_options is None:
131+
writer_options = {}
132+
self.flush_interval = writer_options.get(
133+
"FLUSH_INTERVAL_BYTES", _DEFAULT_FLUSH_INTERVAL_BYTES
134+
)
135+
# TODO: add test case for this.
136+
if self.flush_interval < _MAX_CHUNK_SIZE_BYTES:
137+
raise exceptions.OutOfRange(
138+
f"flush_interval must be >= {_MAX_CHUNK_SIZE_BYTES} , but provided {self.flush_interval}"
139+
)
140+
if self.flush_interval % _MAX_CHUNK_SIZE_BYTES != 0:
141+
raise exceptions.OutOfRange(
142+
f"flush_interval must be a multiple of {_MAX_CHUNK_SIZE_BYTES}, but provided {self.flush_interval}"
143+
)
144+
self.bytes_appended_since_last_flush = 0
128145

129146
async def state_lookup(self) -> int:
130147
"""Returns the persisted_size
@@ -193,7 +210,6 @@ async def append(self, data: bytes) -> None:
193210
self.offset = self.persisted_size
194211

195212
start_idx = 0
196-
bytes_to_flush = 0
197213
while start_idx < total_bytes:
198214
end_idx = min(start_idx + _MAX_CHUNK_SIZE_BYTES, total_bytes)
199215
data_chunk = data[start_idx:end_idx]
@@ -208,10 +224,10 @@ async def append(self, data: bytes) -> None:
208224
)
209225
chunk_size = end_idx - start_idx
210226
self.offset += chunk_size
211-
bytes_to_flush += chunk_size
212-
if bytes_to_flush >= _MAX_BUFFER_SIZE_BYTES:
227+
self.bytes_appended_since_last_flush += chunk_size
228+
if self.bytes_appended_since_last_flush >= self.flush_interval:
213229
await self.simple_flush()
214-
bytes_to_flush = 0
230+
self.bytes_appended_since_last_flush = 0
215231
start_idx = end_idx
216232

217233
async def simple_flush(self) -> None:

tests/system/test_zonal.py

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
from google.cloud.storage._experimental.asyncio.async_grpc_client import AsyncGrpcClient
1414
from google.cloud.storage._experimental.asyncio.async_appendable_object_writer import (
1515
AsyncAppendableObjectWriter,
16+
_DEFAULT_FLUSH_INTERVAL_BYTES,
1617
)
1718
from google.cloud.storage._experimental.asyncio.async_multi_range_downloader import (
1819
AsyncMultiRangeDownloader,
@@ -162,6 +163,59 @@ async def test_basic_wrd_in_slices(storage_client, blobs_to_delete, object_size)
162163
blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name))
163164

164165

166+
@pytest.mark.asyncio
167+
@pytest.mark.parametrize(
168+
"flush_interval",
169+
[2 * 1024 * 1024, 4 * 1024 * 1024, 8 * 1024 * 1024, _DEFAULT_FLUSH_INTERVAL_BYTES],
170+
)
171+
async def test_wrd_with_non_default_flush_interval(
172+
storage_client,
173+
blobs_to_delete,
174+
flush_interval,
175+
):
176+
object_name = f"test_basic_wrd-{str(uuid.uuid4())}"
177+
object_size = 9 * 1024 * 1024
178+
179+
# Client instantiation; it cannot be part of fixture because.
180+
# grpc_client's event loop and event loop of coroutine running it
181+
# (i.e. this test) must be same.
182+
# Note:
183+
# 1. @pytest.mark.asyncio ensures new event loop for each test.
184+
# 2. we can keep the same event loop for entire module but that may
185+
# create issues if tests are run in parallel and one test hogs the event
186+
# loop slowing down other tests.
187+
object_data = os.urandom(object_size)
188+
object_checksum = google_crc32c.value(object_data)
189+
grpc_client = AsyncGrpcClient().grpc_client
190+
191+
writer = AsyncAppendableObjectWriter(
192+
grpc_client,
193+
_ZONAL_BUCKET,
194+
object_name,
195+
writer_options={"FLUSH_INTERVAL_BYTES": flush_interval},
196+
)
197+
await writer.open()
198+
mark1, mark2 = _get_equal_dist(0, object_size)
199+
await writer.append(object_data[0:mark1])
200+
await writer.append(object_data[mark1:mark2])
201+
await writer.append(object_data[mark2:])
202+
object_metadata = await writer.close(finalize_on_close=True)
203+
assert object_metadata.size == object_size
204+
assert int(object_metadata.checksums.crc32c) == object_checksum
205+
206+
mrd = AsyncMultiRangeDownloader(grpc_client, _ZONAL_BUCKET, object_name)
207+
buffer = BytesIO()
208+
await mrd.open()
209+
# (0, 0) means read the whole object
210+
await mrd.download_ranges([(0, 0, buffer)])
211+
await mrd.close()
212+
assert buffer.getvalue() == object_data
213+
assert mrd.persisted_size == object_size
214+
215+
# Clean up; use json client (i.e. `storage_client` fixture) to delete.
216+
blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name))
217+
218+
165219
@pytest.mark.asyncio
166220
async def test_read_unfinalized_appendable_object(storage_client, blobs_to_delete):
167221
object_name = f"read_unfinalized_appendable_object-{str(uuid.uuid4())[:4]}"

tests/unit/asyncio/test_async_appendable_object_writer.py

Lines changed: 64 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@
2121
from google.cloud.storage._experimental.asyncio.async_appendable_object_writer import (
2222
AsyncAppendableObjectWriter,
2323
)
24+
from google.cloud.storage._experimental.asyncio.async_appendable_object_writer import (
25+
_MAX_CHUNK_SIZE_BYTES,
26+
)
2427
from google.cloud import _storage_v2
2528

2629

@@ -29,6 +32,7 @@
2932
GENERATION = 123
3033
WRITE_HANDLE = b"test-write-handle"
3134
PERSISTED_SIZE = 456
35+
EIGHT_MIB = 8 * 1024 * 1024
3236

3337

3438
@pytest.fixture
@@ -52,6 +56,7 @@ def test_init(mock_write_object_stream, mock_client):
5256
assert not writer._is_stream_open
5357
assert writer.offset is None
5458
assert writer.persisted_size is None
59+
assert writer.bytes_appended_since_last_flush == 0
5560

5661
mock_write_object_stream.assert_called_once_with(
5762
client=mock_client,
@@ -78,6 +83,7 @@ def test_init_with_optional_args(mock_write_object_stream, mock_client):
7883

7984
assert writer.generation == GENERATION
8085
assert writer.write_handle == WRITE_HANDLE
86+
assert writer.bytes_appended_since_last_flush == 0
8187

8288
mock_write_object_stream.assert_called_once_with(
8389
client=mock_client,
@@ -88,6 +94,60 @@ def test_init_with_optional_args(mock_write_object_stream, mock_client):
8894
)
8995

9096

97+
@mock.patch(
98+
"google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream"
99+
)
100+
def test_init_with_writer_options(mock_write_object_stream, mock_client):
101+
"""Test the constructor with optional arguments."""
102+
writer = AsyncAppendableObjectWriter(
103+
mock_client,
104+
BUCKET,
105+
OBJECT,
106+
writer_options={"FLUSH_INTERVAL_BYTES": EIGHT_MIB},
107+
)
108+
109+
assert writer.flush_interval == EIGHT_MIB
110+
assert writer.bytes_appended_since_last_flush == 0
111+
112+
mock_write_object_stream.assert_called_once_with(
113+
client=mock_client,
114+
bucket_name=BUCKET,
115+
object_name=OBJECT,
116+
generation_number=None,
117+
write_handle=None,
118+
)
119+
120+
121+
@mock.patch(
122+
"google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream"
123+
)
124+
def test_init_with_flush_interval_less_than_chunk_size_raises_error(mock_client):
125+
"""Test that an OutOfRange error is raised if flush_interval is less than the chunk size."""
126+
127+
with pytest.raises(exceptions.OutOfRange):
128+
AsyncAppendableObjectWriter(
129+
mock_client,
130+
BUCKET,
131+
OBJECT,
132+
writer_options={"FLUSH_INTERVAL_BYTES": _MAX_CHUNK_SIZE_BYTES - 1},
133+
)
134+
135+
136+
@mock.patch(
137+
"google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream"
138+
)
139+
def test_init_with_flush_interval_not_multiple_of_chunk_size_raises_error(mock_client):
140+
"""Test that an OutOfRange error is raised if flush_interval is not a multiple of the chunk size."""
141+
142+
with pytest.raises(exceptions.OutOfRange):
143+
AsyncAppendableObjectWriter(
144+
mock_client,
145+
BUCKET,
146+
OBJECT,
147+
writer_options={"FLUSH_INTERVAL_BYTES": _MAX_CHUNK_SIZE_BYTES + 1},
148+
)
149+
150+
91151
@mock.patch("google.cloud.storage._experimental.asyncio._utils.google_crc32c")
92152
@mock.patch(
93153
"google.cloud.storage._experimental.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client"
@@ -477,7 +537,7 @@ async def test_append_flushes_when_buffer_is_full(
477537
):
478538
"""Test that append flushes the stream when the buffer size is reached."""
479539
from google.cloud.storage._experimental.asyncio.async_appendable_object_writer import (
480-
_MAX_BUFFER_SIZE_BYTES,
540+
_DEFAULT_FLUSH_INTERVAL_BYTES,
481541
)
482542

483543
writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT)
@@ -487,7 +547,7 @@ async def test_append_flushes_when_buffer_is_full(
487547
mock_stream.send = mock.AsyncMock()
488548
writer.simple_flush = mock.AsyncMock()
489549

490-
data = b"a" * _MAX_BUFFER_SIZE_BYTES
550+
data = b"a" * _DEFAULT_FLUSH_INTERVAL_BYTES
491551
await writer.append(data)
492552

493553
writer.simple_flush.assert_awaited_once()
@@ -500,7 +560,7 @@ async def test_append_flushes_when_buffer_is_full(
500560
async def test_append_handles_large_data(mock_write_object_stream, mock_client):
501561
"""Test that append handles data larger than the buffer size."""
502562
from google.cloud.storage._experimental.asyncio.async_appendable_object_writer import (
503-
_MAX_BUFFER_SIZE_BYTES,
563+
_DEFAULT_FLUSH_INTERVAL_BYTES,
504564
)
505565

506566
writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT)
@@ -510,7 +570,7 @@ async def test_append_handles_large_data(mock_write_object_stream, mock_client):
510570
mock_stream.send = mock.AsyncMock()
511571
writer.simple_flush = mock.AsyncMock()
512572

513-
data = b"a" * (_MAX_BUFFER_SIZE_BYTES * 2 + 1)
573+
data = b"a" * (_DEFAULT_FLUSH_INTERVAL_BYTES * 2 + 1)
514574
await writer.append(data)
515575

516576
assert writer.simple_flush.await_count == 2

0 commit comments

Comments
 (0)