Я скопировал некоторые данные из веб-источников и сохранил их в DataFrame pandas. Теперь, чтобы использовать мощные инструменты db, предоставляемые SQLAlchemy, я хочу преобразовать указанный DataFrame в объект Table () и в конечном итоге загрузить все данные в таблицу PostgreSQL. Если это практично, то каков реальный метод решения этой задачи?
Как вставить pandas DataFrame в таблицу PostgreSQL?
Ответы (4)
Если вы используете PostgreSQL 9.5 или новее, вы можете выполнить UPSERT, используя временную таблицу и оператор INSERT ... ON CONFLICT
:
import sqlalchemy as sa
# …
with engine.begin() as conn:
# step 0.0 - create test environment
conn.execute(sa.text("DROP TABLE IF EXISTS main_table"))
conn.execute(
sa.text(
"CREATE TABLE main_table (id int primary key, txt varchar(50))"
)
)
conn.execute(
sa.text(
"INSERT INTO main_table (id, txt) VALUES (1, 'row 1 old text')"
)
)
# step 0.1 - create DataFrame to UPSERT
df = pd.DataFrame(
[(2, "new row 2 text"), (1, "row 1 new text")], columns=["id", "txt"]
)
# step 1 - create temporary table and upload DataFrame
conn.execute(
sa.text(
"CREATE TEMPORARY TABLE temp_table (id int primary key, txt varchar(50))"
)
)
df.to_sql("temp_table", conn, index=False, if_exists="append")
# step 2 - merge temp_table into main_table
conn.execute(
sa.text("""\
INSERT INTO main_table (id, txt)
SELECT id, txt FROM temp_table
ON CONFLICT (id) DO
UPDATE SET txt = EXCLUDED.txt
"""
)
)
# step 3 - confirm results
result = conn.execute(sa.text("SELECT * FROM main_table ORDER BY id")).fetchall()
print(result) # [(1, 'row 1 new text'), (2, 'new row 2 text')]
Если у вас уже есть фреймворк pandas, вы можете использовать df.to_sql для передачи данных напрямую через SQLAlchemy
from sqlalchemy import create_engine
#create a connection from Postgre URI
cnxn = create_engine("postgresql+psycopg2://username:password@host:port/database")
#write dataframe to database
df.to_sql("my_table", con=cnxn, schema="myschema")
Вот мой код для массовой вставки и вставки при запросе обновления конфликта для postgresql из фрейма данных pandas:
Допустим, id - это уникальный ключ как для таблицы postgresql, так и для pandas df, и вы хотите вставлять и обновлять на основе этого идентификатора.
import pandas as pd
from sqlalchemy import create_engine, text
engine = create_engine(postgresql://username:pass@host:port/dbname)
query = text(f"""
INSERT INTO schema.table(name, title, id)
VALUES {','.join([str(i) for i in list(df.to_records(index=False))])}
ON CONFLICT (id)
DO UPDATE SET name= excluded.name,
title= excluded.title
""")
engine.execute(query)
Убедитесь, что ваши столбцы df должны быть в том же порядке, что и ваша таблица.
РЕДАКТИРОВАТЬ 1:
Благодаря комментарию Горда Томпсона я понял, что этот запрос не будет работать, если в столбцах есть одинарные кавычки. Поэтому вот исправление, если в столбцах есть одинарные кавычки:
import pandas as pd
from sqlalchemy import create_engine, text
df.name = df.name.str.replace("'", "''")
df.title = df.title.str.replace("'", "''")
engine = create_engine(postgresql://username:pass@host:port/dbname)
query = text("""
INSERT INTO author(name, title, id)
VALUES %s
ON CONFLICT (id)
DO UPDATE SET name= excluded.name,
title= excluded.title
""" % ','.join([str(i) for i in list(df.to_records(index=False))]).replace('"', "'"))
engine.execute(query)
name
или title
содержат одинарную кавычку. Пример здесь.
- person Gord Thompson; 04.12.2020
name
или title
содержит двойные кавычки. :(
- person Gord Thompson; 05.12.2020
Рассмотрите эту функцию, если ваш DataFrame и таблица SQL уже содержат одинаковые имена и типы столбцов. Преимущества:
- Хорошо, если вам нужно вставить длинный фрейм данных. (Дозирование)
- Избегайте написания длинных операторов sql в своем коде.
- Быстро
.
from sqlalchemy import Table
from sqlalchemy.engine.base import Engine as sql_engine
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy.ext.automap import automap_base
import pandas as pd
def upsert_database(list_input: pd.DataFrame, engine: sql_engine, table: str, schema: str) -> None:
if len(list_input) == 0:
return None
flattened_input = list_input.to_dict('records')
with engine.connect() as conn:
base = automap_base()
base.prepare(engine, reflect=True, schema=schema)
target_table = Table(table, base.metadata,
autoload=True, autoload_with=engine, schema=schema)
chunks = [flattened_input[i:i + 1000] for i in range(0, len(flattened_input), 1000)]
for chunk in chunks:
stmt = insert(target_table).values(chunk)
update_dict = {c.name: c for c in stmt.excluded if not c.primary_key}
conn.execute(stmt.on_conflict_do_update(
constraint=f'{table}_pkey',
set_=update_dict)
)