При работе с большими файлами их эффективная обработка может быть сложной задачей из-за их размера. В таких случаях может быть полезно разделить файл на более мелкие фрагменты и обрабатывать каждый фрагмент отдельно. Именно здесь вступает в игру код Python, который мы собираемся обсудить.

Код Python, который мы рассматриваем, предназначен для разделения большого файла на файлы меньшего размера с использованием нескольких потоков. Это хорошо продуманный код, который эффективно использует многопоточность для параллельного разделения файла, что значительно сокращает время обработки.

Прежде чем мы углубимся в код, давайте разберемся с некоторыми ключевыми понятиями, которые необходимы для понимания кода.

  • Потоки: в Python поток — это отдельный поток выполнения, который выполняется одновременно с основной программой. Каждый поток может выполнять отдельную задачу независимо, что позволяет программе выполнять несколько задач одновременно.
  • Ввод и вывод: код принимает путь к входному файлу, то есть путь к большому файлу, который необходимо разделить. Он также принимает путь к выходному каталогу, то есть путь к каталогу, в котором будут храниться файлы меньшего размера.
  • Разделение на фрагменты: код разбивает входной файл на более мелкие фрагменты, где каждый фрагмент представляет собой подмножество исходного файла. Количество фрагментов определяется количеством потоков, определенных в коде.
#Define the input and output file paths
input_file_path = 'largefile'
output_dir_path = 'output path'

# Create the output directory if it doesn't exist
if not os.path.exists(output_dir_path):
    os.makedirs(output_dir_path)

# Define the number of threads
num_threads = 50

# Define a function to write a chunk to a file
def write_chunk_to_file(chunk_lines, output_file_path):
    with open(output_file_path, 'wb') as output_file:
        output_file.writelines(chunk_lines)

Первый раздел кода определяет путь к входному файлу, путь к выходному каталогу и количество потоков, которые будут использоваться для обработки. В этом разделе определяется функция, которая записывает в файл фрагмент строк. Он принимает два аргумента: chunk_lines, представляющий собой список строк, которые должны быть записаны в файл, и output_file_path, являющийся путем к файлу, в который будут записаны строки.

def write_chunks_in_parallel():
    with open(input_file_path, 'rb') as input_file:
        # Calculate the number of lines in the input file
        num_lines = sum(1 for _ in input_file)
        print(num_lines)

        # Calculate the number of lines per thread
        lines_per_thread = num_lines // num_threads

        # Initialize the line counter
        line_counter = 0

        # Initialize the chunk counter
        chunk_counter = 1

        # Initialize a list to store the threads
        threads = []

        # Go back to the beginning of the file
        input_file.seek(0)
        input_file.readline()

        # Loop through the threads and create them
        for i in range(0,50):
            # Calculate the output file path for this thread
            output_file_path = os.path.join(output_dir_path, f'chunk_{chunk_counter}.txt')

            # Initialize the list of lines for this thread
            thread_lines = []

            # Loop through the lines and add them to this thread's list
            while len(thread_lines) < lines_per_thread:
                line = input_file.readline()
                if not line:
                    break
                thread_lines.append(line)

            # Create a thread to write this chunk of lines to the output file
            thread = threading.Thread(target=write_chunk_to_file, args=(thread_lines, output_file_path))

            # Start the thread
            thread.start()

            # Add the thread to the list of threads
            threads.append(thread)

            # Increment the line counter
            line_counter += len(thread_lines)

            # Increment the chunk counter
            chunk_counter += 1

        # Wait for all the threads to finish
        for thread in threads:
            thread.join()

        # Check if there are any remaining lines
        remaining_lines = input_file.readlines()
        if remaining_lines:
            # Calculate the output file path
            output_file_path = os.path.join(output_dir_path, f'chunk_{chunk_counter}.txt')

            # Write the remaining lines to the output file
            with open(output_file_path, 'wb') as output_file:
                output_file.writelines(remaining_lines)

Основная функция скрипта — write_chunks_in_parallel(). Эта функция открывает входной файл в двоичном режиме и вычисляет количество строк в файле с помощью функции sum(). Затем он вычисляет количество строк в потоке путем деления общего количества строк на количество потоков.

Затем функция инициализирует счетчики строк и фрагментов, а также список для хранения потоков. Он читает первую строку входного файла, чтобы пропустить строку заголовка, затем перебирает количество потоков и создает каждый поток. Для каждого потока он вычисляет путь к выходному файлу, инициализирует список для хранения строк для этого потока, считывает строки из входного файла и добавляет их в список, пока не будет достигнуто количество строк на поток. Затем он создает новый поток с помощью функции threading.Thread() и передает функцию write_chunk_to_file() в качестве целевой функции вместе со списком thread_lines и аргументами output_file_path.

Как только все потоки созданы, функция ожидает их завершения с помощью функции thread.join(). Если во входном файле остались строки, функция записывает их в новый выходной файл.

Заключение:

В этой статье мы рассмотрели сценарий Python, который использует многопоточность для разделения большого файла на файлы меньшего размера. Разбивая файл на более мелкие фрагменты и обрабатывая их параллельно, сценарий может значительно сократить время, необходимое для обработки больших файлов. Это может быть полезно в различных контекстах, таких как анализ данных, машинное обучение и многое другое.