Как вставить pandas DataFrame в таблицу PostgreSQL?

Я скопировал некоторые данные из веб-источников и сохранил их в DataFrame pandas. Теперь, чтобы использовать мощные инструменты db, предоставляемые SQLAlchemy, я хочу преобразовать указанный DataFrame в объект Table () и в конечном итоге загрузить все данные в таблицу PostgreSQL. Если это практично, то каков реальный метод решения этой задачи?


person Nathan Benton    schedule 22.04.2020    source источник


Ответы (4)


Если вы используете PostgreSQL 9.5 или новее, вы можете выполнить UPSERT, используя временную таблицу и оператор INSERT ... ON CONFLICT:

import sqlalchemy as sa

# …

with engine.begin() as conn:
    # step 0.0 - create test environment
    conn.execute(sa.text("DROP TABLE IF EXISTS main_table"))
    conn.execute(
        sa.text(
            "CREATE TABLE main_table (id int primary key, txt varchar(50))"
        )
    )
    conn.execute(
        sa.text(
            "INSERT INTO main_table (id, txt) VALUES (1, 'row 1 old text')"
        )
    )
    # step 0.1 - create DataFrame to UPSERT
    df = pd.DataFrame(
        [(2, "new row 2 text"), (1, "row 1 new text")], columns=["id", "txt"]
    )
    
    # step 1 - create temporary table and upload DataFrame
    conn.execute(
        sa.text(
            "CREATE TEMPORARY TABLE temp_table (id int primary key, txt varchar(50))"
        )
    )
    df.to_sql("temp_table", conn, index=False, if_exists="append")

    # step 2 - merge temp_table into main_table
    conn.execute(
        sa.text("""\
            INSERT INTO main_table (id, txt) 
            SELECT id, txt FROM temp_table
            ON CONFLICT (id) DO
                UPDATE SET txt = EXCLUDED.txt
            """
        )
    )

    # step 3 - confirm results
    result = conn.execute(sa.text("SELECT * FROM main_table ORDER BY id")).fetchall()
    print(result)  # [(1, 'row 1 new text'), (2, 'new row 2 text')]
person Gord Thompson    schedule 14.06.2020

Если у вас уже есть фреймворк pandas, вы можете использовать df.to_sql для передачи данных напрямую через SQLAlchemy

from sqlalchemy import create_engine
#create a connection from Postgre URI
cnxn = create_engine("postgresql+psycopg2://username:password@host:port/database")
#write dataframe to database
df.to_sql("my_table", con=cnxn, schema="myschema")
person Nathan Mathews    schedule 22.04.2020
comment
Действительно, это, безусловно, жизнеспособные варианты, и спасибо за ваш вклад! Однако я хочу обновить данные, а не просто вставить или заменить таблицу. Вот где я думаю, что sqlalchemy может быть лучшим вариантом. - person Nathan Benton; 22.04.2020
comment
stackoverflow.com/questions/25955200/ Может быть, вы могли бы использовать эту оболочку для SqlAlchemy Insert, которая динамически реализует upsert с использованием предложения on commit? - person Nathan Mathews; 22.04.2020
comment
Да, это работает, только если у вас есть объект sqlalchemy Table (). Для этого мне сначала нужно преобразовать df pandas в объект Table (). - это главное и первое, что я хочу сделать - person Nathan Benton; 22.04.2020

Вот мой код для массовой вставки и вставки при запросе обновления конфликта для postgresql из фрейма данных pandas:

Допустим, id - это уникальный ключ как для таблицы postgresql, так и для pandas df, и вы хотите вставлять и обновлять на основе этого идентификатора.

import pandas as pd
from sqlalchemy import create_engine, text

engine = create_engine(postgresql://username:pass@host:port/dbname)
query = text(f""" 
                INSERT INTO schema.table(name, title, id)
                VALUES {','.join([str(i) for i in list(df.to_records(index=False))])}
                ON CONFLICT (id)
                DO  UPDATE SET name= excluded.name,
                               title= excluded.title
         """)
engine.execute(query)

Убедитесь, что ваши столбцы df должны быть в том же порядке, что и ваша таблица.

РЕДАКТИРОВАТЬ 1:

Благодаря комментарию Горда Томпсона я понял, что этот запрос не будет работать, если в столбцах есть одинарные кавычки. Поэтому вот исправление, если в столбцах есть одинарные кавычки:

import pandas as pd
from sqlalchemy import create_engine, text

df.name = df.name.str.replace("'", "''")
df.title = df.title.str.replace("'", "''")
engine = create_engine(postgresql://username:pass@host:port/dbname)
query = text(""" 
            INSERT INTO author(name, title, id)
            VALUES %s
            ON CONFLICT (id)
            DO  UPDATE SET name= excluded.name,
                           title= excluded.title
     """ % ','.join([str(i) for i in list(df.to_records(index=False))]).replace('"', "'"))
engine.execute(query)
person Ekrem Gurdal    schedule 04.12.2020
comment
Проблема с SQL-инъекцией: приведенный выше код завершится ошибкой, если name или title содержат одинарную кавычку. Пример здесь. - person Gord Thompson; 04.12.2020
comment
@GordThompson спасибо за ваш комментарий. Я отредактировал свое решение выше - person Ekrem Gurdal; 05.12.2020
comment
Теперь код не работает, если name или title содержит двойные кавычки. :( - person Gord Thompson; 05.12.2020

Рассмотрите эту функцию, если ваш DataFrame и таблица SQL уже содержат одинаковые имена и типы столбцов. Преимущества:

  • Хорошо, если вам нужно вставить длинный фрейм данных. (Дозирование)
  • Избегайте написания длинных операторов sql в своем коде.
  • Быстро

.

from sqlalchemy import Table
from sqlalchemy.engine.base import Engine as sql_engine
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy.ext.automap import automap_base
import pandas as pd


def upsert_database(list_input: pd.DataFrame, engine: sql_engine, table: str, schema: str) -> None:
    if len(list_input) == 0:
        return None
    flattened_input = list_input.to_dict('records')
    with engine.connect() as conn:
        base = automap_base()
        base.prepare(engine, reflect=True, schema=schema)
        target_table = Table(table, base.metadata,
                             autoload=True, autoload_with=engine, schema=schema)
        chunks = [flattened_input[i:i + 1000] for i in range(0, len(flattened_input), 1000)]
        for chunk in chunks:
            stmt = insert(target_table).values(chunk)
            update_dict = {c.name: c for c in stmt.excluded if not c.primary_key}
            conn.execute(stmt.on_conflict_do_update(
                constraint=f'{table}_pkey',
                set_=update_dict)
            )
person Carl Kristensen    schedule 19.02.2021
comment
Я хочу использовать это, но меня немного пугают все новые для меня функции sqlalchemy. Если у вас когда-нибудь будет возможность объяснить или прокомментировать этот ответ, я думаю, он может быть отличным для тех из нас, кому нужно выполнить апсерт из фреймов данных. - person autonopy; 22.07.2021