null

Скачиваем и объединяем файлы из s3

Встала задача скачать из s3 бакета несколько сотен мелких файлов .csv и агрегировать их в более крупные файлы с ограничением по размеру 100МБ. Сделаем это с помощью python и пакета boto3.

Хранилище располагается в облаке Яндекса, поэтому для получения доступа следуем инструкции и создаём файлы:

~/.aws/config
~/.aws/credentials

В инструкции всё хорошо написано, повторяться не буду.

Рассмотрим скрипт, реализующий функционал:

import boto3
import io
from argparse import ArgumentParser
from datetime import datetime


session = boto3.session.Session()
s3 = session.client(
    service_name='s3',
    endpoint_url='https://storage.yandexcloud.net'
)

BUCKET = ''


def get_file_header():
    return  b"Display Name,URL,Data,Some Other Data\n"


def get_obj_batches(contents_list, max_size=100*1024*1024):
    batch = []
    batch_size = 0
    for item in contents_list:
        key = item['Key']
        size = item['Size']
        if size > max_size:
            raise Exception(f"Object {key} size:{size} bigger than max_size {max_size}")
        if batch_size + size > max_size:
            yield batch
            batch = [key]
            batch_size = size
        else:
            batch.append(key)
            batch_size += size
    if batch:
        yield batch


def merge_files(bucket, prefix=''):
    response = s3.list_objects_v2(Bucket=bucket, Prefix=prefix)
    filename_prefix = f"{prefix or datetime.now()}"
    header = get_file_header()
    batch_no = 0
    for names in get_obj_batches(response['Contents']):
        filename_no = "%02d" % batch_no
        filename = f"{filename_prefix}_{filename_no}.csv"
        batch_no += 1
        with open(filename, 'wb') as file:
            file.write(header)
            buf = io.BytesIO()
            for obj in names:
                print(obj)
                s3.download_fileobj(bucket, obj, buf)
                buf.seek(0)
                file.write(buf.read())
                buf.seek(0)
                buf.truncate()
            buf.close()


if __name__ == '__main__':
    parser = ArgumentParser(description="Download and merge files from s3 bucket")
    parser.add_argument('prefix')
    args = parser.parse_args()
    merge_files(BUCKET, args.prefix)

Скрипт можно сохранить в файл merge_csv.py, установить boto3 и запустить:

pip install boto3
python merge_csv.py report_2024-06-18

Скрипт принимает 1 аргумент - префикс для поиска объектов в бакете. Имя бакета неизменно в пределах окружения, поэтому захардкожено прямо в коде скрипта в переменной BUCKET. А примерно так выглядит содержимое бакета:

report_2024-06-18/chunk_0000.csv
report_2024-06-18/chunk_0001.csv
report_2024-06-18/chunk_0002.csv
report_2024-06-18/chunk_0003.csv
report_2024-06-18/chunk_0004.csv
...

Могу упомянуть несколько особенностей:

  • Функция get_obj_batches() генерирует списки объектов таким образом, чтобы каждый список содержал объекты, совокупным объёмом данных не более 100МБ. Далее по каждому списку в отдельной итерации будут скачаны все объекты и объединены в один файл.
  • Функция s3.download_fileobj() скачивает объект и записывает его в file-like буфер buf. Не хотелось создавать временных файлов для записи, поэтому buf используется именно как буфер, содержимое которого усекается до 0 каждую итерацию.
  • Заголовки в csv-шках s3 отсутствуют, поэтому конкатенировать их в один файл легко и удобно. Заголовок в файлы добавляет сам скрипт функцией get_file_header(). Важно, что она возвращает байты и что в конце имеется перенос строки \n.

В результате мы получаем большие объединенные файлы. Вот вывод du -sh *csv:

100M    report_2024-06-18_00.csv
100M    report_2024-06-18_01.csv
99M     report_2024-06-18_01.csv
100M    report_2024-06-18_01.csv
39M     report_2024-06-18_01.csv

Заключение

Скрипт скачивает объекты с указанным префиксом, объединяет их в файлы размером не более 100МБ и добавляет строку-заголовок. Было бы неплохо прикрутить параллельное скачивание в несколько потоков. И это даже легко сделать уже в текущей реализации, однако лень ;)