Прежде чем углубляться в детали нашего процесса миграции данных, убедитесь, что у вас есть следующие предварительные условия:

  • Уверенное понимание Python
  • Базовые знания баз данных
  • Python и pip установлены в вашей системе

Чтобы убедиться, что Python и pip установлены, выполните в терминале следующие команды:

python --version
pip --version

Настройка среды

Начнем с настройки окружения:

  1. Создайте новый каталог, в котором вы будете работать над сценарием миграции.
  2. Внутри этого каталога вручную создайте файл с именем requirements.txt. Альтернативно вы можете создать этот файл, запустив:
pip freeze > requirements.txt

3. Обновите requirements.txt, добавив следующие зависимости:

google-cloud-bigquery==3.10.0
numpy==1.25.2

Вы можете настроить номера версий в соответствии с вашими потребностями.

4. Аутентифицируйте скрипт, выполнив:

gcloud auth login

Эта команда проведет вас через процесс аутентификации, гарантируя, что скрипт сможет получить доступ к вашим данным BigQuery.

Создание проекта JSON

Теперь давайте создадим файл JSON, который будет служить основой для нашего сценария миграции. Этот файл будет определять исходную таблицу BigQuery, целевую таблицу PostgreSQL и сопоставления столбцов.

[
  {
    "BigQueryTable": "AuthUsers",
    "PostgresTable": "Users",
    "unique_column": "id",
    "chunks": 100,
    "col": [
      {
        "source_column": "id",
        "destination_column": "id"
      },
      {
        "source_column": "firstname",
        "custom_hook": "first_name"
      },
     
      {
        "source_column": "lastname",
        "destination_column": "last_name"
      },
      {
        "source_column": "emailaddress",
        "destination_column": "email_address"
      },
      {
        "source_column": "password",
        "destination_column": "password"
      },
      {
        "source_column": "companyname",
        "destination_column": "company"
      }
    ]
  }
]

В этой структуре JSON определите имена исходной и целевой таблиц, укажите уникальный столбец, установите количество фрагментов для извлечения данных и при необходимости сопоставьте исходные столбцы с их целевыми столбцами.

Сценарий миграции

Затем создайте скрипт Python с именем migration_script.py, который будет использовать информацию из схемы JSON и BigQuery для создания файлов SQL для импорта PostgreSQL. Вот сценарий:

import os
import json
import numpy as np
import concurrent.futures
from google.cloud import bigquery

BIGQUERY_TABLE = "bigquery_table"
POSTGRES_TABLE = "postgres_table"
COLUMNS_STRUCTURE = "columns_structure"
UNIQUE_COLUMN = "unique_column"
PROJECT_ID = "your_project_id"
PROJECT_ID = "your_project_id"
CHUNKS = "no_of_chunks"

def read_json_data(json_file_path):
    """Read JSON migration data from a file."""
    with open(json_file_path) as f:
        return json.load(f)

def upsert(table, newdict):
    """Generate an upsert SQL query."""
    keys = list(newdict.keys())
    values = list(newdict.values())
    sql = [
        f"INSERT INTO {table} ({', '.join(keys)}) VALUES ({', '.join(values)});"
    ]
    return "".join(sql)

def divide_chunks(lst, n):
    """Divide a list into chunks of a specified size."""
    for i in range(0, len(lst), n):
        yield lst[i:i + n]

def process_chunk(chunk, source_table, destination_table, columns, project_id, unique_column):
    """Process a chunk of data and write SQL queries to files."""
    limit = np.subtract(chunk["end"], chunk["start"])
    offset = chunk["start"]
    client = bigquery.Client(project=project_id)
    query = f"SELECT DISTINCT * FROM your-bigquery-dataset.{source_table}" if unique_column else f"SELECT * FROM bmg-bigquery-test.prosaic.{source_table}"
    query_job = client.query(f"{query} LIMIT {limit} OFFSET {offset}")
    rows = query_job.result()

    output_folder = f"./sql_dump/{destination_table}"
    start_index = chunk["start"]
    end_index = chunk["end"]

    if not os.path.isdir(output_folder):
        os.makedirs(output_folder)

    sql_file_name = os.path.join(output_folder, f"{destination_table}_{start_index}_{end_index}.sql")

    with open(sql_file_name, "w") as file_sql:
        for _, row in enumerate(rows):
            data_dict = {}
            for col in columns:
                val = row[col["source_column"]]
                col_value = col.get("destination_column", col["source_column"])
                data_dict[col_value] = str(val)
            query = upsert(destination_table, data_dict)
            file_sql.write(f"{query}\n\n")

def main():
    json_file_path = input("Enter File Path: ")

    if not json_file_path:
        raise Exception("Oops! JSON migration file path required.")

    json_data = read_json_data(json_file_path)

    for data in json_data:
        source_table = data[BIGQUERY_TABLE]
        destination_table = data[POSTGRES_TABLE]
        unique_column = data.get(UNIQUE_COLUMN)
        columns = data[COLUMNS_STRUCTURE]
        no_of_chunks = data[CHUNKS]

        client = bigquery.Client(project=PROJECT_ID)
        query_params = f"DISTINCT {unique_column}" if unique_column else "*"
        query = f"SELECT COUNT({query_params}) as total_rows FROM your-bigquery-dataset.{source_table}"
        query_job = client.query(query)
        rows = query_job.result()
        total_rows = np.sum([int(row["total_rows"]) for row in rows])
        each_loop_records = int(total_rows / no_of_chunks)

        starts = np.arange(0, no_of_chunks * each_loop_records, each_loop_records)
        ends = np.append(starts[1:], total_rows)
        chunk_array = [{"start": start, "end": end} for start, end in zip(starts, ends)]

        with concurrent.futures.ThreadPoolExecutor(5) as executor:
            futures = [
                executor.submit(
                    process_chunk,
                    chunk,
                    source_table,
                    destination_table,
                    columns,
                    PROJECT_ID,
                    unique_column,
                )
                for chunk in chunk_array
            ]
            concurrent.futures.wait(futures)

        print("Migration Script Completed Successfully!")

if __name__ == "__main__":
    main()

Запуск миграции

  • Выполните сценарий миграции, указав путь к файлу схемы JSON при появлении соответствующего запроса:
python migration_script.py
  • Расслабьтесь и наблюдайте, как скрипт эффективно извлекает данные из BigQuery по частям и генерирует файлы SQL для PostgreSQL.
  • Далее остается только импортировать эти SQL-файлы в PostgreSQL вручную или с помощью скрипта.

Заключение

В заключение, эффективная миграция данных из BigQuery в PostgreSQL имеет решающее значение для компаний, стремящихся максимально эффективно использовать свои данные. Благодаря хорошо структурированной стратегии миграции вы можете свести к минимуму время простоя, сохранить целостность данных и плавно перенести данные в новое место.

Выполнив шаги, описанные в этой статье, и используя предоставленный сценарий, вы будете хорошо подготовлены к эффективной миграции данных.