s3 Parallel Access with asyncio

Asked 2 years ago, Updated 2 years ago, 18 views

I'd like to load the shredded file in S3 and import it into a different database.

When I wrote the code of prefix listing and reading objects one by one without asyncio, it took me about 40 minutes to load 30,000 files, and the CPU used only about 20 percent, so I imagine that S3 access is probably a bottleneck.

Therefore, I would like to access S3 in 4 parallel using asyncio


Make the bucket list part
https://pypi.org/project/aioboto3/


Object Retrieval https://pypi.org/project/aiobotocore/

After referring to , if you get an error saying it's a child routine type while moving it, insert the wait appropriately.
It's working, but it doesn't seem to be parallel at all

import asyncio
import aioboto3

async def get_object(s3, bucket, key):
  print("start get object")

  obj=awights3.Object(bucket,key)
  response = wait obj.get()
  async with response ['Body'] as stream:
    gz_data=wait stream.read()

  print("end get object")

async def list_bucket():
  ### list json###
  bucket_name = 'test-bucket'
  prefix='prefix/'

  async with aioboto3.resource("s3") as s3:
    bucket=awights3.Bucket(bucket_name)#<-------------------
    sem=asyncio.Semaphore(4)
    async for s3_object in bucket.objects.filter (Prefix=prefix):
      wait sem.acquire()
      wait get_object(s3, bucket_name, s3_object.key)
      sem.release()

defmain():
  loop=asyncio.get_event_loop()
  loop.run_until_complete(list_bucket())
  
if__name__=='__main__':
  main()

Put up to 4 in parallel in get_object

obj=awights3.Object(bucket,key)
  response = wait obj.get()
  async with response ['Body'] as stream:
    gz_data=wait stream.read()

I'm going to get some data from S3 somewhere here, so I should block it and let go of the processing.
Run to

start get object
start get object
start get object
start get object
end get object
  :

First of all, I would like you to issue only s3 object acquisition requests in 4 parallel.
When you actually run it

start get object
end get object
start get object
end get object
  :

It looks like get_object is only running in series

I've only tried parallel processing in thread fork join format.
It's my first time using asyncio, so I don't really understand it.
wait get_object(s3, bucket_name, s3_object.key)
You don't know why wait is necessary here, do you?

Don't wait for the end of the get_object and move on to the next loop
I'd like you to run the following get_object.
If you don't do this, they won't run it inside the get_object in the first place

How do I access s3 in parallel with asyncio?

Additional information

In the create_task you told me in the comment
The parts to retrieve objects from s3 were created in parallel, but
I'm having trouble doing create_task if I don't read the page first.

If the number of files reaches tens of thousands, getting tens of thousands of key arrays first is a waste of memory, so
I'd like to read the objects on each page and discard the keys I read.

async for s3_object in bucket.objects.filter (Prefix=prefix):

I think we need to break down the part of , but what is the difference between async for and regular for?

python

2022-09-30 10:22

1 Answers

How about using asyncio.create_task()
https://docs.python.org/ja/3/library/asyncio-task.html


2022-09-30 10:22

If you have any answers or tips


© 2024 OneMinuteCode. All rights reserved.