Встала задача скачать из s3 бакета несколько сотен мелких файлов .csv и агрегировать их в более крупные файлы с ограничением по размеру 100МБ. Сделаем это с помощью python и пакета boto3.
Хранилище располагается в облаке Яндекса, поэтому для получения доступа следуем инструкции и создаём файлы:
~/.aws/config
~/.aws/credentials
В инструкции всё хорошо написано, повторяться не буду.
Рассмотрим скрипт, реализующий функционал:
import boto3
import io
from argparse import ArgumentParser
from datetime import datetime
session = boto3.session.Session()
s3 = session.client(
service_name='s3',
endpoint_url='https://storage.yandexcloud.net'
)
BUCKET = ''
def get_file_header():
return b"Display Name,URL,Data,Some Other Data\n"
def get_obj_batches(contents_list, max_size=100*1024*1024):
batch = []
batch_size = 0
for item in contents_list:
key = item['Key']
size = item['Size']
if size > max_size:
raise Exception(f"Object {key} size:{size} bigger than max_size {max_size}")
if batch_size + size > max_size:
yield batch
batch = [key]
batch_size = size
else:
batch.append(key)
batch_size += size
if batch:
yield batch
def merge_files(bucket, prefix=''):
response = s3.list_objects_v2(Bucket=bucket, Prefix=prefix)
filename_prefix = f"{prefix or datetime.now()}"
header = get_file_header()
batch_no = 0
for names in get_obj_batches(response['Contents']):
filename_no = "%02d" % batch_no
filename = f"{filename_prefix}_{filename_no}.csv"
batch_no += 1
with open(filename, 'wb') as file:
file.write(header)
buf = io.BytesIO()
for obj in names:
print(obj)
s3.download_fileobj(bucket, obj, buf)
buf.seek(0)
file.write(buf.read())
buf.seek(0)
buf.truncate()
buf.close()
if __name__ == '__main__':
parser = ArgumentParser(description="Download and merge files from s3 bucket")
parser.add_argument('prefix')
args = parser.parse_args()
merge_files(BUCKET, args.prefix)
Скрипт можно сохранить в файл merge_csv.py, установить boto3 и запустить:
pip install boto3
python merge_csv.py report_2024-06-18
Скрипт принимает 1 аргумент - префикс для поиска объектов в бакете. Имя бакета неизменно в пределах окружения, поэтому захардкожено прямо в коде скрипта в переменной BUCKET. А примерно так выглядит содержимое бакета:
report_2024-06-18/chunk_0000.csv
report_2024-06-18/chunk_0001.csv
report_2024-06-18/chunk_0002.csv
report_2024-06-18/chunk_0003.csv
report_2024-06-18/chunk_0004.csv
...
Могу упомянуть несколько особенностей:
- Функция
get_obj_batches()
генерирует списки объектов таким образом, чтобы каждый список содержал объекты, совокупным объёмом данных не более 100МБ. Далее по каждому списку в отдельной итерации будут скачаны все объекты и объединены в один файл.
- Функция
s3.download_fileobj()
скачивает объект и записывает его в file-like буфер buf
. Не хотелось создавать временных файлов для записи, поэтому buf используется именно как буфер, содержимое которого усекается до 0 каждую итерацию.
- Заголовки в csv-шках s3 отсутствуют, поэтому конкатенировать их в один файл легко и удобно. Заголовок в файлы добавляет сам скрипт функцией
get_file_header()
. Важно, что она возвращает байты и что в конце имеется перенос строки \n
.
В результате мы получаем большие объединенные файлы. Вот вывод du -sh *csv
:
100M report_2024-06-18_00.csv
100M report_2024-06-18_01.csv
99M report_2024-06-18_01.csv
100M report_2024-06-18_01.csv
39M report_2024-06-18_01.csv
Заключение
Скрипт скачивает объекты с указанным префиксом, объединяет их в файлы размером не более 100МБ и добавляет строку-заголовок. Было бы неплохо прикрутить параллельное скачивание в несколько потоков. И это даже легко сделать уже в текущей реализации, однако лень ;)