Прежде чем углубляться в детали нашего процесса миграции данных, убедитесь, что у вас есть следующие предварительные условия:
- Уверенное понимание Python
- Базовые знания баз данных
- Python и pip установлены в вашей системе
Чтобы убедиться, что Python и pip установлены, выполните в терминале следующие команды:
python --version pip --version
Настройка среды
Начнем с настройки окружения:
- Создайте новый каталог, в котором вы будете работать над сценарием миграции.
- Внутри этого каталога вручную создайте файл с именем
requirements.txt
. Альтернативно вы можете создать этот файл, запустив:
pip freeze > requirements.txt
3. Обновите requirements.txt
, добавив следующие зависимости:
google-cloud-bigquery==3.10.0 numpy==1.25.2
Вы можете настроить номера версий в соответствии с вашими потребностями.
4. Аутентифицируйте скрипт, выполнив:
gcloud auth login
Эта команда проведет вас через процесс аутентификации, гарантируя, что скрипт сможет получить доступ к вашим данным BigQuery.
Создание проекта JSON
Теперь давайте создадим файл JSON, который будет служить основой для нашего сценария миграции. Этот файл будет определять исходную таблицу BigQuery, целевую таблицу PostgreSQL и сопоставления столбцов.
[ { "BigQueryTable": "AuthUsers", "PostgresTable": "Users", "unique_column": "id", "chunks": 100, "col": [ { "source_column": "id", "destination_column": "id" }, { "source_column": "firstname", "custom_hook": "first_name" }, { "source_column": "lastname", "destination_column": "last_name" }, { "source_column": "emailaddress", "destination_column": "email_address" }, { "source_column": "password", "destination_column": "password" }, { "source_column": "companyname", "destination_column": "company" } ] } ]
В этой структуре JSON определите имена исходной и целевой таблиц, укажите уникальный столбец, установите количество фрагментов для извлечения данных и при необходимости сопоставьте исходные столбцы с их целевыми столбцами.
Сценарий миграции
Затем создайте скрипт Python с именем migration_script.py
, который будет использовать информацию из схемы JSON и BigQuery для создания файлов SQL для импорта PostgreSQL. Вот сценарий:
import os import json import numpy as np import concurrent.futures from google.cloud import bigquery BIGQUERY_TABLE = "bigquery_table" POSTGRES_TABLE = "postgres_table" COLUMNS_STRUCTURE = "columns_structure" UNIQUE_COLUMN = "unique_column" PROJECT_ID = "your_project_id" PROJECT_ID = "your_project_id" CHUNKS = "no_of_chunks" def read_json_data(json_file_path): """Read JSON migration data from a file.""" with open(json_file_path) as f: return json.load(f) def upsert(table, newdict): """Generate an upsert SQL query.""" keys = list(newdict.keys()) values = list(newdict.values()) sql = [ f"INSERT INTO {table} ({', '.join(keys)}) VALUES ({', '.join(values)});" ] return "".join(sql) def divide_chunks(lst, n): """Divide a list into chunks of a specified size.""" for i in range(0, len(lst), n): yield lst[i:i + n] def process_chunk(chunk, source_table, destination_table, columns, project_id, unique_column): """Process a chunk of data and write SQL queries to files.""" limit = np.subtract(chunk["end"], chunk["start"]) offset = chunk["start"] client = bigquery.Client(project=project_id) query = f"SELECT DISTINCT * FROM your-bigquery-dataset.{source_table}" if unique_column else f"SELECT * FROM bmg-bigquery-test.prosaic.{source_table}" query_job = client.query(f"{query} LIMIT {limit} OFFSET {offset}") rows = query_job.result() output_folder = f"./sql_dump/{destination_table}" start_index = chunk["start"] end_index = chunk["end"] if not os.path.isdir(output_folder): os.makedirs(output_folder) sql_file_name = os.path.join(output_folder, f"{destination_table}_{start_index}_{end_index}.sql") with open(sql_file_name, "w") as file_sql: for _, row in enumerate(rows): data_dict = {} for col in columns: val = row[col["source_column"]] col_value = col.get("destination_column", col["source_column"]) data_dict[col_value] = str(val) query = upsert(destination_table, data_dict) file_sql.write(f"{query}\n\n") def main(): json_file_path = input("Enter File Path: ") if not json_file_path: raise Exception("Oops! JSON migration file path required.") json_data = read_json_data(json_file_path) for data in json_data: source_table = data[BIGQUERY_TABLE] destination_table = data[POSTGRES_TABLE] unique_column = data.get(UNIQUE_COLUMN) columns = data[COLUMNS_STRUCTURE] no_of_chunks = data[CHUNKS] client = bigquery.Client(project=PROJECT_ID) query_params = f"DISTINCT {unique_column}" if unique_column else "*" query = f"SELECT COUNT({query_params}) as total_rows FROM your-bigquery-dataset.{source_table}" query_job = client.query(query) rows = query_job.result() total_rows = np.sum([int(row["total_rows"]) for row in rows]) each_loop_records = int(total_rows / no_of_chunks) starts = np.arange(0, no_of_chunks * each_loop_records, each_loop_records) ends = np.append(starts[1:], total_rows) chunk_array = [{"start": start, "end": end} for start, end in zip(starts, ends)] with concurrent.futures.ThreadPoolExecutor(5) as executor: futures = [ executor.submit( process_chunk, chunk, source_table, destination_table, columns, PROJECT_ID, unique_column, ) for chunk in chunk_array ] concurrent.futures.wait(futures) print("Migration Script Completed Successfully!") if __name__ == "__main__": main()
Запуск миграции
- Выполните сценарий миграции, указав путь к файлу схемы JSON при появлении соответствующего запроса:
python migration_script.py
- Расслабьтесь и наблюдайте, как скрипт эффективно извлекает данные из BigQuery по частям и генерирует файлы SQL для PostgreSQL.
- Далее остается только импортировать эти SQL-файлы в PostgreSQL вручную или с помощью скрипта.
Заключение
В заключение, эффективная миграция данных из BigQuery в PostgreSQL имеет решающее значение для компаний, стремящихся максимально эффективно использовать свои данные. Благодаря хорошо структурированной стратегии миграции вы можете свести к минимуму время простоя, сохранить целостность данных и плавно перенести данные в новое место.
Выполнив шаги, описанные в этой статье, и используя предоставленный сценарий, вы будете хорошо подготовлены к эффективной миграции данных.