Ограничение дочерних коллекций в начальном запросе sqlalchemy

Я создаю API, который может возвращать дочерние ресурсы, если пользователь этого запрашивает. Например, у user есть messages. Я хочу, чтобы запрос мог ограничить количество возвращаемых объектов message.

Я нашел полезный совет об ограничении количества объектов в дочерних коллекциях здесь. В основном, это указывает на следующий поток:

class User(...):
    # ...
    messages = relationship('Messages', order_by='desc(Messages.date)', lazy='dynamic')

user = User.query.one()
users.messages.limit(10)

Мой вариант использования включает возврат иногда большого количества пользователей.

Если бы я последовал совету в этой ссылке и использовал .limit(), мне пришлось бы перебрать всю коллекцию пользователей, вызывающих .limit() для каждого из них. Это гораздо менее эффективно, чем, скажем, использование LIMIT в исходном выражении sql, которое создало коллекцию.

Мой вопрос заключается в том, можно ли с помощью декларативного метода эффективно (N + 0) загружать большую коллекцию объектов, ограничивая при этом количество дочерних элементов в своих дочерних коллекциях с помощью sqlalchemy?

ОБНОВЛЕНИЕ

Чтобы было ясно, ниже я пытаюсь избежать.

users = User.query.all()
messages = {}
for user in users:
    messages[user.id] = user.messages.limit(10).all()

Я хочу сделать что-то большее, например:

users = User.query.option(User.messages.limit(10)).all()

person melchoir55    schedule 01.05.2017    source источник


Ответы (4)


Этот ответ исходит от Майка Байера из группы Google sqlalchemy. Я размещаю его здесь, чтобы помочь людям: TLDR: я использовал version 1 ответа Майка, чтобы решить свою проблему, потому что в этом случае у меня нет внешних ключей, участвующих в этой связи, и поэтому я не могу использовать LATERAL. Версия 1 работала отлично, но обязательно обратите внимание на эффект offset. Это сбило меня с толку во время тестирования на некоторое время, потому что я не заметил, что для него было установлено что-то отличное от 0.

Блок кода для версии 1:

subq = s.query(Messages.date).\
    filter(Messages.user_id == User.id).\
    order_by(Messages.date.desc()).\
    limit(1).offset(10).correlate(User).as_scalar()

q = s.query(User).join(
    Messages,
    and_(User.id == Messages.user_id, Messages.date > subq)
).options(contains_eager(User.messages))

Ответ Майка, поэтому вам следует игнорировать, использует ли он «декларативный», который не имеет ничего общего с запросами, и фактически сначала игнорируйте и Query, потому что в первую очередь это проблема SQL. Вам нужен один оператор SQL, который делает это. Какой запрос в SQL будет загружать множество строк из первичной таблицы, соединенных с первыми десятью строками вторичной таблицы для каждой первичной?

LIMIT сложна, потому что на самом деле она не является частью обычного расчета «реляционной алгебры». Это вне этого, потому что это искусственное ограничение на количество строк. Например, моя первая мысль о том, как это сделать, была неправильной:

    select * from users left outer join (select * from messages limit 10) as anon_1 on users.id = anon_1.user_id

Это неправильно, потому что он получает только первые десять сообщений в совокупности, без учета пользователя. Мы хотим получить первые десять сообщений для каждого пользователя, а значит, нам нужно сделать это «выбрать из лимита сообщений 10» индивидуально для каждого пользователя. То есть надо как-то соотносить. Хотя коррелированный подзапрос обычно не разрешен как элемент FROM и разрешен только как выражение SQL, он может возвращать только один столбец и одну строку; мы обычно не можем ПРИСОЕДИНИТЬСЯ к коррелированному подзапросу в простом ванильном SQL. Однако мы можем сопоставить предложение ON в JOIN, чтобы сделать это возможным в ванильном SQL.

Но сначала, если мы работаем с современной версией Postgresql, мы можем нарушить это обычное правило корреляции и использовать ключевое слово LATERAL, которое разрешает корреляцию в предложении FROM. LATERAL поддерживается только современными версиями Postgresql, и это упрощает задачу:

    select * from users left outer join lateral
    (select * from message where message.user_id = users.id order by messages.date desc limit 10) as anon1 on users.id = anon_1.user_id

мы поддерживаем ключевое слово LATERAL. Вышеприведенный запрос выглядит так:

subq = s.query(Messages).\
    filter(Messages.user_id == User.id).\
    order_by(Messages.date.desc()).limit(10).subquery().lateral()

q = s.query(User).outerjoin(subq).\
     options(contains_eager(User.messages, alias=subq))

Обратите внимание, что выше, чтобы ВЫБРАТЬ пользователей и сообщения и создать их в коллекции User.messages, необходимо использовать параметр «contains_eager()», и для этого «динамический» должен исчезнуть. Это не единственный вариант, вы можете, например, построить второе отношение для User.messages, которое не имеет «динамического», или вы можете просто загрузить из запроса (пользователь, сообщение) отдельно и организовать кортежи результатов по мере необходимости.

если вы не используете Postgresql или версию Postgresql, которая не поддерживает LATERAL, вместо этого корреляция должна быть включена в предложение ON соединения. SQL выглядит так:

select * from users left outer join messages on
users.id = messages.user_id and messages.date > (select date from messages where messages.user_id = users.id order by date desc limit 1 offset 10)

Здесь, чтобы втиснуть туда LIMIT, мы на самом деле проходим первые 10 строк с помощью OFFSET, а затем выполняем LIMIT 1, чтобы получить дату, представляющую нижнюю границу даты, которую мы хотим для каждого пользователя. Затем мы должны объединиться при сравнении этой даты, что может быть дорого, если этот столбец не проиндексирован, а также может быть неточным, если есть повторяющиеся даты.

Этот запрос выглядит так:

subq = s.query(Messages.date).\
    filter(Messages.user_id == User.id).\
    order_by(Messages.date.desc()).\
    limit(1).offset(10).correlate(User).as_scalar()

q = s.query(User).join(
    Messages,
    and_(User.id == Messages.user_id, Messages.date >= subq)
).options(contains_eager(User.messages))

Я не доверяю такого рода запросам без хорошего теста, поэтому приведенный ниже POC включает обе версии, включая проверку работоспособности.

from sqlalchemy import *
from sqlalchemy.orm import *
from sqlalchemy.ext.declarative import declarative_base
import datetime

Base = declarative_base()


class User(Base):
    __tablename__ = 'user'
    id = Column(Integer, primary_key=True)
    messages = relationship(
        'Messages', order_by='desc(Messages.date)')

class Messages(Base):
    __tablename__ = 'message'
    id = Column(Integer, primary_key=True)
    user_id = Column(ForeignKey('user.id'))
    date = Column(Date)

e = create_engine("postgresql://scott:tiger@localhost/test", echo=True)
Base.metadata.drop_all(e)
Base.metadata.create_all(e)

s = Session(e)

s.add_all([
    User(id=i, messages=[
        Messages(id=(i * 20) + j, date=datetime.date(2017, 3, j))
        for j in range(1, 20)
    ]) for i in range(1, 51)
])

s.commit()

top_ten_dates = set(datetime.date(2017, 3, j) for j in range(10, 20))


def run_test(q):
    all_u = q.all()
    assert len(all_u) == 50
    for u in all_u:

        messages = u.messages
        assert len(messages) == 10

        for m in messages:
            assert m.user_id == u.id

        received = set(m.date for m in messages)

        assert received == top_ten_dates

# version 1.   no LATERAL

s.close()

subq = s.query(Messages.date).\
    filter(Messages.user_id == User.id).\
    order_by(Messages.date.desc()).\
    limit(1).offset(10).correlate(User).as_scalar()

q = s.query(User).join(
    Messages,
    and_(User.id == Messages.user_id, Messages.date > subq)
).options(contains_eager(User.messages))

run_test(q)

# version 2.  LATERAL

s.close()

subq = s.query(Messages).\
    filter(Messages.user_id == User.id).\
    order_by(Messages.date.desc()).limit(10).subquery().lateral()

q = s.query(User).outerjoin(subq).\
    options(contains_eager(User.messages, alias=subq))

run_test(q)
person melchoir55    schedule 02.05.2017
comment
Это потрясающе, я сейчас работаю над этим в своем коде. Я чувствую, что это может быть на самом деле не правильно. Это работает только потому, что даты ваших сообщений одинаковы для всех пользователей? Подзапрос выглядит так, как будто он дает 1 дату назад, смещенную на 10 из списка DESC. Все ваши сообщения помещаются в этот блок и, следовательно, возвращаются из внешнего запроса. Я все еще экспериментирую со смешанными результатами - person cgmckeever; 07.09.2019
comment
Мое лучшее предположение здесь заключается в том, что если у вас есть сообщения, попадающие в диапазон дат, они будут включать всех соответствующих пользователей/сообщений. Если нет сообщений новее самого последнего, то он не вернет ни одного или меньше желаемых 50 (поскольку их будет не так много для извлечения). - person cgmckeever; 07.09.2019
comment
хорошо .. это какой-то сложный шазз .. Еще раз спасибо. Похоже, что там есть какая-то магия, которая выглядит на основе даты сообщений для каждого пользователя. Это потрясающе. Где меня сбивает с толку, если смещение больше, чем количество сообщений для этого пользователя, кажется, что оно возвращается пустым. Там может быть способ сделать пункт или или что-то в этом роде.. веселая суббота - person cgmckeever; 07.09.2019

Если вы примените лимит, а затем вызовете для него .all(), вы получите все объекты один раз, и он не будет получать объекты один за другим, вызывая упомянутую вами проблему с производительностью.

просто примените ограничение и получите все объекты.

users = User.query.limit(50).all()
print(len(users))
>>50

Или для дочерних объектов/отношений

user = User.query.one()
all_messages = user.messages.limit(10).all()


users = User.query.all()
messages = {}
for user in users:
    messages[user.id] = user.messages.limit(10).all()
person Zohaib Ijaz    schedule 01.05.2017
comment
Извините за путаницу. Я не хочу ограничивать количество users. Я хочу ограничить количество messages в user.messages (дочерняя коллекция до users). - person melchoir55; 02.05.2017
comment
То же самое относится к ленивым дочерним коллекциям/отношениям - person Zohaib Ijaz; 02.05.2017
comment
Чтобы быть более ясным, я хочу получить all() пользователей, и для каждого отдельного возвращаемого объекта я хочу, чтобы количество сообщений было ограничено 10 (например). - person melchoir55; 02.05.2017
comment
Извините за продолжающуюся путаницу. Я обновил свой вопрос, чтобы быть более ясным. - person melchoir55; 02.05.2017
comment
Вам не нужно получать все пользовательские объекты один за другим, чтобы применить ограничение к дочерним элементам. просто получите всех пользователей, а затем примените ограничение ко всем сообщениям. Вы не можете получить 10 сообщений с запросом пользователя. Вы должны повторить - person Zohaib Ijaz; 02.05.2017

Итак, я думаю, вам нужно будет загрузить сообщения во втором запросе, а затем каким-то образом связать их с вашими пользователями. Следующее зависит от базы данных; как обсуждается в этом вопросе, mysql делает не поддерживает запросы с ограничениями, но sqlite, по крайней мере, проанализирует запрос. Я не смотрел на план, чтобы увидеть, если он сделал хорошую работу. Следующий код найдет все нужные вам объекты сообщения. Затем вам нужно связать их с пользователями.
Я проверил это, чтобы убедиться, что он создает запрос, который sqlite может анализировать; Я не подтверждал, что sqlite или любая другая база данных правильно выполняет этот запрос. Мне пришлось немного схитрить и использовать текстовый примитив для ссылки на внешний столбец user.id в выборе, потому что SQLAlchemy постоянно хотел включить дополнительное соединение с пользователями во внутреннем подзапросе выбора.

from sqlalchemy import Column, Integer, String, ForeignKey, alias
from sqlalchemy.sql import text

from sqlalchemy.orm import Session
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()

class User(Base):
    __tablename__ = 'users'
    id = Column(Integer, primary_key = True)
    name = Column(String)

class Message(Base):
    __tablename__ = 'messages'
    user_id = Column(Integer, ForeignKey(User.id), nullable = False)
    id = Column(Integer, primary_key = True)


s = Session()
m1 = alias(Message.__table__)

user_query = s.query(User) # add any user filtering you want
inner_query = s.query(m1.c.id).filter(m1.c.user_id == text('users.id')).limit(10)
all_messages_you_want = s.query(Message).join(User).filter(Message.id.in_(inner_query))

Чтобы связать сообщения с пользователями, вы можете сделать что-то вроде следующего, предполагая, что ваше сообщение имеет отношение пользователя, а ваши пользовательские объекты имеют метод got_child_message, который делает для этого все, что вам нравится.

users_resulting = user_query.all() #load objects into session and hold a reference
for m in all_messages_you_want: m.user.got_child_message(m)

Поскольку у вас уже есть пользователи в сеансе и поскольку отношение находится в первичном ключе пользователя, m.user разрешается в query.get по карте идентификаторов. Я надеюсь, что это поможет вам получить где-то.

person Sam Hartman    schedule 02.05.2017

Ответ @melchoirs - лучший. Я в основном ставлю это здесь для будущего

Я поиграл с приведенным выше ответом, и он работает, мне это было нужно больше, чтобы ограничить количество возвращаемых ассоциаций перед переходом в сериализатор Marshmallow.

Некоторые вопросы для уточнения:

  • подзапрос запускается для каждой ассоциации, поэтому он находит соответствующий date для правильной базы
  • подумайте о лимите/смещении, дайте мне 1 (лимитную) запись, начинающуюся со следующего X (смещения). Следовательно, какая Х-я самая старая запись, а затем в основном запросе она отдает все обратно. чертовски умный
  • Получается, что если в ассоциации меньше X записей, она ничего не возвращает, так как смещение выходит за пределы записей, и впредь основной запрос не возвращает ни одной записи.

Используя приведенное выше в качестве шаблона, я пришел к следующему ответу. Первоначальная защита запроса/счетчика возникает из-за того, что если связанные записи меньше смещения, ничего не найдено. Кроме того, мне нужно было добавить внешнее соединение на случай, если ассоциаций тоже нет.

В конце концов, я обнаружил, что этот запрос является чем-то вроде вуду ORM, и не хотел идти по этому пути. Вместо этого я исключаю histories из сериализатора устройства и требую второго поиска history с использованием идентификатора device. Этот набор можно разбивать на страницы, что делает все немного чище.

Оба метода работают, все сводится к тому, что why вам нужно выполнить один запрос, а не пару. В приведенном выше, вероятно, были бизнес-причины более эффективно получить все обратно с помощью одного запроса. В моем случае удобочитаемость и условность превзошли вуду.

@classmethod
    def get_limited_histories(cls, uuid, limit=10):

        count = DeviceHistory.query.filter(DeviceHistory.device_id == uuid).count()

        if count > limit:
            sq = db.session.query(DeviceHistory.created_at) \
                .filter(DeviceHistory.device_id == Device.uuid) \
                .order_by(DeviceHistory.created_at.desc()) \
                .limit(1).offset(limit).correlate(Device)


        return db.session.query(Device).filter(Device.uuid == uuid) \
                .outerjoin(DeviceHistory,
                    and_(DeviceHistory.device_id == Device.uuid, DeviceHistory.created_at > sq)) \
                .options(contains_eager(Device.device_histories)).all()[0]

Затем он ведет себя аналогично Device.query.get(id), но Device.get_limited_histories(id)

  • НАСЛАЖДАТЬСЯ
person cgmckeever    schedule 08.09.2019