Skip to content

Commit 89bfe7a

Browse files
authored
feat(experimental): flush the last chunk in append method (#1699)
Earlier the last chunk was being flushed while calling the close() method. Now it will be done inside the append method itself.
1 parent a57ea0e commit 89bfe7a

File tree

5 files changed

+162
-27
lines changed

5 files changed

+162
-27
lines changed

google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py

Lines changed: 30 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -188,8 +188,9 @@ async def append(self, data: bytes) -> None:
188188
ie. `self.offset` bytes relative to the begining of the object.
189189
190190
This method sends the provided `data` to the GCS server in chunks.
191-
and persists data in GCS at every `_MAX_BUFFER_SIZE_BYTES` bytes by
192-
calling `self.simple_flush`.
191+
and persists data in GCS at every `_DEFAULT_FLUSH_INTERVAL_BYTES` bytes
192+
or at the last chunk whichever is earlier. Persisting is done by setting
193+
`flush=True` on request.
193194
194195
:type data: bytes
195196
:param data: The bytes to append to the object.
@@ -214,20 +215,33 @@ async def append(self, data: bytes) -> None:
214215
while start_idx < total_bytes:
215216
end_idx = min(start_idx + _MAX_CHUNK_SIZE_BYTES, total_bytes)
216217
data_chunk = data[start_idx:end_idx]
218+
is_last_chunk = end_idx == total_bytes
219+
chunk_size = end_idx - start_idx
217220
await self.write_obj_stream.send(
218221
_storage_v2.BidiWriteObjectRequest(
219222
write_offset=self.offset,
220223
checksummed_data=_storage_v2.ChecksummedData(
221224
content=data_chunk,
222225
crc32c=int.from_bytes(Checksum(data_chunk).digest(), "big"),
223226
),
227+
state_lookup=is_last_chunk,
228+
flush=is_last_chunk
229+
or (
230+
self.bytes_appended_since_last_flush + chunk_size
231+
>= self.flush_interval
232+
),
224233
)
225234
)
226-
chunk_size = end_idx - start_idx
227235
self.offset += chunk_size
228236
self.bytes_appended_since_last_flush += chunk_size
237+
229238
if self.bytes_appended_since_last_flush >= self.flush_interval:
230-
await self.simple_flush()
239+
self.bytes_appended_since_last_flush = 0
240+
241+
if is_last_chunk:
242+
response = await self.write_obj_stream.recv()
243+
self.persisted_size = response.persisted_size
244+
self.offset = self.persisted_size
231245
self.bytes_appended_since_last_flush = 0
232246
start_idx = end_idx
233247

@@ -292,20 +306,24 @@ async def close(self, finalize_on_close=False) -> Union[int, _storage_v2.Object]
292306
raise ValueError("Stream is not open. Call open() before close().")
293307

294308
if finalize_on_close:
295-
await self.finalize()
296-
else:
297-
await self.flush()
309+
return await self.finalize()
298310

299311
await self.write_obj_stream.close()
300312

301313
self._is_stream_open = False
302314
self.offset = None
303-
return self.object_resource if finalize_on_close else self.persisted_size
315+
return self.persisted_size
304316

305317
async def finalize(self) -> _storage_v2.Object:
306318
"""Finalizes the Appendable Object.
307319
308320
Note: Once finalized no more data can be appended.
321+
This method is different from `close`. if `.close()` is called data may
322+
still be appended to object at a later point in time by opening with
323+
generation number.
324+
(i.e. `open(..., generation=<object_generation_number>)`.
325+
However if `.finalize()` is called no more data can be appended to the
326+
object.
309327
310328
rtype: google.cloud.storage_v2.types.Object
311329
returns: The finalized object resource.
@@ -322,6 +340,10 @@ async def finalize(self) -> _storage_v2.Object:
322340
response = await self.write_obj_stream.recv()
323341
self.object_resource = response.resource
324342
self.persisted_size = self.object_resource.size
343+
await self.write_obj_stream.close()
344+
345+
self._is_stream_open = False
346+
self.offset = None
325347
return self.object_resource
326348

327349
# helper methods.

noxfile.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,14 @@ def system(session):
192192
# 2021-05-06: defer installing 'google-cloud-*' to after this package,
193193
# in order to work around Python 2.7 googolapis-common-protos
194194
# issue.
195-
session.install("mock", "pytest", "pytest-rerunfailures", "-c", constraints_path)
195+
session.install(
196+
"mock",
197+
"pytest",
198+
"pytest-rerunfailures",
199+
"pytest-asyncio",
200+
"-c",
201+
constraints_path,
202+
)
196203
session.install("-e", ".", "-c", constraints_path)
197204
session.install(
198205
"google-cloud-testutils",

tests/system/test_zonal.py

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -307,3 +307,56 @@ async def _read_and_verify(expected_content, generation=None):
307307
del mrd
308308
del mrd_2
309309
gc.collect()
310+
311+
312+
@pytest.mark.asyncio
313+
async def test_append_flushes_and_state_lookup(storage_client, blobs_to_delete):
314+
"""
315+
System test for AsyncAppendableObjectWriter, verifying flushing behavior
316+
for both small and large appends.
317+
"""
318+
object_name = f"test-append-flush-varied-size-{uuid.uuid4()}"
319+
grpc_client = AsyncGrpcClient().grpc_client
320+
writer = AsyncAppendableObjectWriter(grpc_client, _ZONAL_BUCKET, object_name)
321+
322+
# Schedule for cleanup
323+
blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name))
324+
325+
# --- Part 1: Test with small data ---
326+
small_data = b"small data"
327+
328+
await writer.open()
329+
assert writer._is_stream_open
330+
331+
await writer.append(small_data)
332+
persisted_size = await writer.state_lookup()
333+
assert persisted_size == len(small_data)
334+
335+
# --- Part 2: Test with large data ---
336+
large_data = os.urandom(38 * 1024 * 1024)
337+
338+
# Append data larger than the default flush interval (16 MiB).
339+
# This should trigger the interval-based flushing logic.
340+
await writer.append(large_data)
341+
342+
# Verify the total data has been persisted.
343+
total_size = len(small_data) + len(large_data)
344+
persisted_size = await writer.state_lookup()
345+
assert persisted_size == total_size
346+
347+
# --- Part 3: Finalize and verify ---
348+
final_object = await writer.close(finalize_on_close=True)
349+
350+
assert not writer._is_stream_open
351+
assert final_object.size == total_size
352+
353+
# Verify the full content of the object.
354+
full_data = small_data + large_data
355+
mrd = AsyncMultiRangeDownloader(grpc_client, _ZONAL_BUCKET, object_name)
356+
buffer = BytesIO()
357+
await mrd.open()
358+
# (0, 0) means read the whole object
359+
await mrd.download_ranges([(0, 0, buffer)])
360+
await mrd.close()
361+
content = buffer.getvalue()
362+
assert content == full_data

tests/unit/asyncio/retry/test_writes_resumption_strategy.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,9 @@ def test_update_state_from_response(self):
206206
strategy.update_state_from_response(response2, state)
207207
self.assertEqual(write_state.persisted_size, 1024)
208208

209-
final_resource = storage_type.Object(name="test-object", bucket="b", size=2048, finalize_time=datetime.now())
209+
final_resource = storage_type.Object(
210+
name="test-object", bucket="b", size=2048, finalize_time=datetime.now()
211+
)
210212
response3 = storage_type.BidiWriteObjectResponse(resource=final_resource)
211213
strategy.update_state_from_response(response3, state)
212214
self.assertEqual(write_state.persisted_size, 2048)

tests/unit/asyncio/test_async_appendable_object_writer.py

Lines changed: 68 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -364,6 +364,7 @@ async def test_close(mock_write_object_stream, mock_client):
364364
writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT)
365365
writer._is_stream_open = True
366366
writer.offset = 1024
367+
writer.persisted_size = 1024
367368
mock_stream = mock_write_object_stream.return_value
368369
mock_stream.send = mock.AsyncMock()
369370
mock_stream.recv = mock.AsyncMock(
@@ -435,16 +436,20 @@ async def test_finalize(mock_write_object_stream, mock_client):
435436
mock_stream.recv = mock.AsyncMock(
436437
return_value=_storage_v2.BidiWriteObjectResponse(resource=mock_resource)
437438
)
439+
mock_stream.close = mock.AsyncMock()
438440

439441
gcs_object = await writer.finalize()
440442

441443
mock_stream.send.assert_awaited_once_with(
442444
_storage_v2.BidiWriteObjectRequest(finish_write=True)
443445
)
444446
mock_stream.recv.assert_awaited_once()
447+
mock_stream.close.assert_awaited_once()
445448
assert writer.object_resource == mock_resource
446449
assert writer.persisted_size == 123
447450
assert gcs_object == mock_resource
451+
assert writer._is_stream_open is False
452+
assert writer.offset is None
448453

449454

450455
@pytest.mark.asyncio
@@ -501,30 +506,39 @@ async def test_append_sends_data_in_chunks(mock_write_object_stream, mock_client
501506
writer.persisted_size = 100
502507
mock_stream = mock_write_object_stream.return_value
503508
mock_stream.send = mock.AsyncMock()
504-
writer.simple_flush = mock.AsyncMock()
505509

506510
data = b"a" * (_MAX_CHUNK_SIZE_BYTES + 1)
511+
mock_stream.recv = mock.AsyncMock(
512+
return_value=_storage_v2.BidiWriteObjectResponse(
513+
persisted_size=100 + len(data)
514+
)
515+
)
516+
507517
await writer.append(data)
508518

509519
assert mock_stream.send.await_count == 2
510-
first_call = mock_stream.send.await_args_list[0]
511-
second_call = mock_stream.send.await_args_list[1]
520+
first_request = mock_stream.send.await_args_list[0].args[0]
521+
second_request = mock_stream.send.await_args_list[1].args[0]
512522

513523
# First chunk
514-
assert first_call[0][0].write_offset == 100
515-
assert len(first_call[0][0].checksummed_data.content) == _MAX_CHUNK_SIZE_BYTES
516-
assert first_call[0][0].checksummed_data.crc32c == int.from_bytes(
524+
assert first_request.write_offset == 100
525+
assert len(first_request.checksummed_data.content) == _MAX_CHUNK_SIZE_BYTES
526+
assert first_request.checksummed_data.crc32c == int.from_bytes(
517527
Checksum(data[:_MAX_CHUNK_SIZE_BYTES]).digest(), byteorder="big"
518528
)
519-
# Second chunk
520-
assert second_call[0][0].write_offset == 100 + _MAX_CHUNK_SIZE_BYTES
521-
assert len(second_call[0][0].checksummed_data.content) == 1
522-
assert second_call[0][0].checksummed_data.crc32c == int.from_bytes(
529+
assert not first_request.flush
530+
assert not first_request.state_lookup
531+
532+
# Second chunk (last chunk)
533+
assert second_request.write_offset == 100 + _MAX_CHUNK_SIZE_BYTES
534+
assert len(second_request.checksummed_data.content) == 1
535+
assert second_request.checksummed_data.crc32c == int.from_bytes(
523536
Checksum(data[_MAX_CHUNK_SIZE_BYTES:]).digest(), byteorder="big"
524537
)
538+
assert second_request.flush
539+
assert second_request.state_lookup
525540

526541
assert writer.offset == 100 + len(data)
527-
writer.simple_flush.assert_not_awaited()
528542

529543

530544
@pytest.mark.asyncio
@@ -541,12 +555,25 @@ async def test_append_flushes_when_buffer_is_full(
541555
writer.persisted_size = 0
542556
mock_stream = mock_write_object_stream.return_value
543557
mock_stream.send = mock.AsyncMock()
544-
writer.simple_flush = mock.AsyncMock()
558+
mock_stream.recv = mock.AsyncMock()
545559

546560
data = b"a" * _DEFAULT_FLUSH_INTERVAL_BYTES
547561
await writer.append(data)
548562

549-
writer.simple_flush.assert_awaited_once()
563+
num_chunks = _DEFAULT_FLUSH_INTERVAL_BYTES // _MAX_CHUNK_SIZE_BYTES
564+
assert mock_stream.send.await_count == num_chunks
565+
566+
# All but the last request should not have flush or state_lookup set.
567+
for i in range(num_chunks - 1):
568+
request = mock_stream.send.await_args_list[i].args[0]
569+
assert not request.flush
570+
assert not request.state_lookup
571+
572+
# The last request should have flush and state_lookup set.
573+
last_request = mock_stream.send.await_args_list[-1].args[0]
574+
assert last_request.flush
575+
assert last_request.state_lookup
576+
assert writer.bytes_appended_since_last_flush == 0
550577

551578

552579
@pytest.mark.asyncio
@@ -561,12 +588,18 @@ async def test_append_handles_large_data(mock_write_object_stream, mock_client):
561588
writer.persisted_size = 0
562589
mock_stream = mock_write_object_stream.return_value
563590
mock_stream.send = mock.AsyncMock()
564-
writer.simple_flush = mock.AsyncMock()
591+
mock_stream.recv = mock.AsyncMock()
565592

566593
data = b"a" * (_DEFAULT_FLUSH_INTERVAL_BYTES * 2 + 1)
567594
await writer.append(data)
568595

569-
assert writer.simple_flush.await_count == 2
596+
flushed_requests = [
597+
call.args[0] for call in mock_stream.send.await_args_list if call.args[0].flush
598+
]
599+
assert len(flushed_requests) == 3
600+
601+
last_request = mock_stream.send.await_args_list[-1].args[0]
602+
assert last_request.state_lookup
570603

571604

572605
@pytest.mark.asyncio
@@ -584,17 +617,35 @@ async def test_append_data_two_times(mock_write_object_stream, mock_client):
584617
writer.persisted_size = 0
585618
mock_stream = mock_write_object_stream.return_value
586619
mock_stream.send = mock.AsyncMock()
587-
writer.simple_flush = mock.AsyncMock()
588620

589621
data1 = b"a" * (_MAX_CHUNK_SIZE_BYTES + 10)
622+
mock_stream.recv = mock.AsyncMock(
623+
return_value=_storage_v2.BidiWriteObjectResponse(
624+
persisted_size= len(data1)
625+
)
626+
)
590627
await writer.append(data1)
591628

629+
assert mock_stream.send.await_count == 2
630+
last_request_data1 = mock_stream.send.await_args_list[-1].args[0]
631+
assert last_request_data1.flush
632+
assert last_request_data1.state_lookup
633+
592634
data2 = b"b" * (_MAX_CHUNK_SIZE_BYTES + 20)
635+
mock_stream.recv = mock.AsyncMock(
636+
return_value=_storage_v2.BidiWriteObjectResponse(
637+
persisted_size= len(data2) + len(data1)
638+
)
639+
)
593640
await writer.append(data2)
594641

642+
assert mock_stream.send.await_count == 4
643+
last_request_data2 = mock_stream.send.await_args_list[-1].args[0]
644+
assert last_request_data2.flush
645+
assert last_request_data2.state_lookup
646+
595647
total_data_length = len(data1) + len(data2)
596648
assert writer.offset == total_data_length
597-
assert writer.simple_flush.await_count == 0
598649

599650

600651
@pytest.mark.asyncio

0 commit comments

Comments
 (0)