-
Notifications
You must be signed in to change notification settings - Fork 168
feat(samples): add samples for appendable objects writes and reads #1705
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
|
Here is the summary of changes. You are about to add 8 region tags.
This comment is generated by snippet-bot.
|
Summary of ChangesHello @chandra-siri, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly expands the sample code for Google Cloud Storage, focusing on experimental asynchronous operations for appendable objects and advanced ranged read capabilities. The new samples provide practical examples for developers to understand and utilize features like creating and appending to objects, finalizing uploads, pausing and resuming uploads, and performing various types of ranged reads, including concurrent reads across multiple objects and real-time 'tailing' of appendable data streams. These additions aim to enhance the utility and clarity of the client library's asynchronous functionalities. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces several new sample files demonstrating appendable object writes and reads. The samples are well-structured and cover a range of use cases. My review focuses on improving resource management by ensuring that streams are always closed, even in the case of errors, and on correcting some inconsistencies in comments and docstrings. Overall, these are great additions to the sample library.
| # Open the object in read mode. | ||
| await mrd.open() | ||
|
|
||
| output_buffer = BytesIO() | ||
| # A download range of (0, 0) means to read from the beginning to the end. | ||
| await mrd.download_ranges([(0, 0, output_buffer)]) | ||
|
|
||
| await mrd.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To ensure the downloader stream is always closed, even if errors occur, it's best to wrap the open and download_ranges calls in a try...finally block. This guarantees that mrd.close() is called for proper resource cleanup.
| # Open the object in read mode. | |
| await mrd.open() | |
| output_buffer = BytesIO() | |
| # A download range of (0, 0) means to read from the beginning to the end. | |
| await mrd.download_ranges([(0, 0, output_buffer)]) | |
| await mrd.close() | |
| try: | |
| # Open the object in read mode. | |
| await mrd.open() | |
| output_buffer = BytesIO() | |
| # A download range of (0, 0) means to read from the beginning to the end. | |
| await mrd.download_ranges([(0, 0, output_buffer)]) | |
| finally: | |
| if mrd.is_stream_open: | |
| await mrd.close() |
| await writer.open() | ||
| print(f"Created empty appendable object: {object_name}") | ||
|
|
||
| # 2. Create the appender and tailer coroutines. | ||
| appender_task = asyncio.create_task(appender(writer, duration)) | ||
| # # Add a small delay to ensure the object is created before tailing begins. | ||
| # await asyncio.sleep(1) | ||
| tailer_task = asyncio.create_task(tailer(bucket_name, object_name, duration)) | ||
|
|
||
| # 3. Execute the coroutines concurrently. | ||
| await asyncio.gather(appender_task, tailer_task) | ||
|
|
||
| await writer.close() | ||
| print("Writer closed.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To ensure the writer stream is always closed, especially if an error occurs in the appender or tailer tasks, the main logic should be wrapped in a try...finally block. This guarantees that writer.close() is called, preventing resource leaks.
| await writer.open() | |
| print(f"Created empty appendable object: {object_name}") | |
| # 2. Create the appender and tailer coroutines. | |
| appender_task = asyncio.create_task(appender(writer, duration)) | |
| # # Add a small delay to ensure the object is created before tailing begins. | |
| # await asyncio.sleep(1) | |
| tailer_task = asyncio.create_task(tailer(bucket_name, object_name, duration)) | |
| # 3. Execute the coroutines concurrently. | |
| await asyncio.gather(appender_task, tailer_task) | |
| await writer.close() | |
| print("Writer closed.") | |
| try: | |
| # 1. Create an empty appendable object. | |
| await writer.open() | |
| print(f"Created empty appendable object: {object_name}") | |
| # 2. Create the appender and tailer coroutines. | |
| appender_task = asyncio.create_task(appender(writer, duration)) | |
| # # Add a small delay to ensure the object is created before tailing begins. | |
| # await asyncio.sleep(1) | |
| tailer_task = asyncio.create_task(tailer(bucket_name, object_name, duration)) | |
| # 3. Execute the coroutines concurrently. | |
| await asyncio.gather(appender_task, tailer_task) | |
| finally: | |
| if writer._is_stream_open: | |
| await writer.close() | |
| print("Writer closed.") |
| async def tailer(bucket_name: str, object_name: str, duration: int): | ||
| """Tails the object by reading new data as it is appended.""" | ||
| print("Tailer started.") | ||
| start_byte = 0 | ||
| client = AsyncGrpcClient().grpc_client | ||
| start_time = time.monotonic() | ||
| mrd = AsyncMultiRangeDownloader(client, bucket_name, object_name) | ||
| await mrd.open() | ||
| # Run the tailer for the specified duration. | ||
| while time.monotonic() - start_time < duration: | ||
| output_buffer = BytesIO() | ||
| # A download range of (start, 0) means to read from 'start' to the end. | ||
| await mrd.download_ranges([(start_byte, 0, output_buffer)]) | ||
|
|
||
| bytes_downloaded = output_buffer.getbuffer().nbytes | ||
| if bytes_downloaded > 0: | ||
| now = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] | ||
| print( | ||
| f"[{now}] Tailer read {bytes_downloaded} new bytes: {output_buffer.getvalue()}" | ||
| ) | ||
| start_byte += bytes_downloaded | ||
|
|
||
| await asyncio.sleep(0.1) # Poll for new data every second. | ||
| print("Tailer finished.") | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function has two issues:
- The
AsyncMultiRangeDownloader(mrd) is not closed if an error occurs. It's crucial to wrap the stream operations in atry...finallyblock to ensuremrd.close()is always called. - The comment on line 66 is misleading. The polling interval is 0.1 seconds, not every second.
async def tailer(bucket_name: str, object_name: str, duration: int):
"""Tails the object by reading new data as it is appended."""
print("Tailer started.")
start_byte = 0
client = AsyncGrpcClient().grpc_client
start_time = time.monotonic()
mrd = AsyncMultiRangeDownloader(client, bucket_name, object_name)
try:
await mrd.open()
# Run the tailer for the specified duration.
while time.monotonic() - start_time < duration:
output_buffer = BytesIO()
# A download range of (start, 0) means to read from 'start' to the end.
await mrd.download_ranges([(start_byte, 0, output_buffer)])
bytes_downloaded = output_buffer.getbuffer().nbytes
if bytes_downloaded > 0:
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
print(
f"[{now}] Tailer read {bytes_downloaded} new bytes: {output_buffer.getvalue()}"
)
start_byte += bytes_downloaded
await asyncio.sleep(0.1) # Poll for new data every 0.1 seconds.
finally:
if mrd.is_stream_open:
await mrd.close()
print("Tailer finished.")| mrd = AsyncMultiRangeDownloader(client, bucket_name, object_name) | ||
|
|
||
| # Open the object in read mode. | ||
| await mrd.open() | ||
|
|
||
| # Each object downloads the first 100 bytes. | ||
| start_byte = 0 | ||
| size = 100 | ||
|
|
||
| # requested range will be downloaded into this buffer, user may provide | ||
| # their own buffer or file-like object. | ||
| output_buffer = BytesIO() | ||
| await mrd.download_ranges([(start_byte, size, output_buffer)]) | ||
|
|
||
| await mrd.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To ensure that the AsyncMultiRangeDownloader stream is always closed, even if an error occurs during processing, it's best practice to wrap the operations in a try...finally block. This guarantees resource cleanup and prevents potential leaks.
| mrd = AsyncMultiRangeDownloader(client, bucket_name, object_name) | |
| # Open the object in read mode. | |
| await mrd.open() | |
| # Each object downloads the first 100 bytes. | |
| start_byte = 0 | |
| size = 100 | |
| # requested range will be downloaded into this buffer, user may provide | |
| # their own buffer or file-like object. | |
| output_buffer = BytesIO() | |
| await mrd.download_ranges([(start_byte, size, output_buffer)]) | |
| await mrd.close() | |
| mrd = AsyncMultiRangeDownloader(client, bucket_name, object_name) | |
| try: | |
| # Open the object in read mode. | |
| await mrd.open() | |
| # Each object downloads the first 100 bytes. | |
| start_byte = 0 | |
| size = 100 | |
| # requested range will be downloaded into this buffer, user may provide | |
| # their own buffer or file-like object. | |
| output_buffer = BytesIO() | |
| await mrd.download_ranges([(start_byte, size, output_buffer)]) | |
| finally: | |
| if mrd.is_stream_open: | |
| await mrd.close() |
| await writer2.open() | ||
|
|
||
| # 5. Append some more data using the new writer. | ||
| await writer2.append(b"Second part of the data.") | ||
| print( | ||
| f"Appended more data. Total size is now {writer2.persisted_size} bytes." | ||
| ) | ||
|
|
||
| # 6. Finally, close the new writer. | ||
| await writer2.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to the first writer, the operations for writer2 should be wrapped in a try...finally block. This ensures that writer2.close() is called, preventing resource leaks if an error occurs after the stream is opened.
| await writer2.open() | |
| # 5. Append some more data using the new writer. | |
| await writer2.append(b"Second part of the data.") | |
| print( | |
| f"Appended more data. Total size is now {writer2.persisted_size} bytes." | |
| ) | |
| # 6. Finally, close the new writer. | |
| await writer2.close() | |
| # 4. Open the new writer. | |
| try: | |
| await writer2.open() | |
| # 5. Append some more data using the new writer. | |
| await writer2.append(b"Second part of the data.") | |
| print( | |
| f"Appended more data. Total size is now {writer2.persisted_size} bytes." | |
| ) | |
| finally: | |
| # 6. Finally, close the new writer. | |
| if writer2._is_stream_open: | |
| await writer2.close() |
|
|
||
|
|
||
| async def storage_create_and_write_appendable_object(bucket_name, object_name): | ||
| """Uploads a appendable object to zonal bucket.""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
|
||
| # [START storage_read_appendable_object_tail] | ||
| async def appender(writer: AsyncAppendableObjectWriter, duration: int): | ||
| """Appends 1 byte to the object every second for a given duration.""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The docstring is inaccurate. The code appends 10 bytes in each iteration, not 1 byte. Please update the docstring to reflect the actual behavior.
| """Appends 1 byte to the object every second for a given duration.""" | |
| """Appends 10 bytes to the object every second for a given duration.""" |
| """Appends 1 byte to the object every second for a given duration.""" | ||
| print("Appender started.") | ||
| for i in range(duration): | ||
| await writer.append(os.urandom(10)) # Append 1 random byte. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| # finalize the appendable object, | ||
| # once finalized no more appends can be done to the object. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment has a minor grammatical issue. It would be clearer if it were a complete sentence.
| # finalize the appendable object, | |
| # once finalized no more appends can be done to the object. | |
| # Finalize the appendable object. | |
| # Once finalized, no more appends can be done to the object. |
| # Once all appends are done, closes the gRPC bidirectional stream. | ||
| await writer.close() | ||
|
|
||
| print('Appended object {} created of size {} bytes.'.format(object_name, writer.persisted_size)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For better readability and consistency with modern Python practices, it's recommended to use an f-string for string formatting instead of .format().
| print('Appended object {} created of size {} bytes.'.format(object_name, writer.persisted_size)) | |
| print(f'Appended object {object_name} created of size {writer.persisted_size} bytes.') |
feat(samples): add samples for appendable objects writes and reads