Курсоры SQLAlchemy: AttributeError: объект 'tuple' не имеет атрибута 'items'

Я пытаюсь написать итератор, который будет принимать запрос и экспортировать строки пакетами в виде списка dicts. Вот полный пример того, с чем я работаю:

class QueryStream(collections.Iterator):
    def __init__(self, conn_details, query, max_rows=None, batch_size=2000):
        # Initialize vars.
        self.engine = dst.get_connection(conn_details)
        self.query = query
        self.max_rows = max_rows
        self.batch_size = batch_size
        self.fetched_rows = 0

        # Create a database cursor from query.
        self.conn = self.engine.raw_connection()
        self.cursor = self.conn.cursor()
        self.cursor.execute(self.query)

    def next(self):
        try:
            if self.max_rows:
                if self.max_rows <= self.fetched_rows:
                    # Maximum rows has been fetched, so stop iterating.
                    raise StopIteration
                elif self.max_rows <= self.batch_size:
                    # Max rowset is small enough to be done in one batch.
                    batch_size = self.max_rows
                elif self.max_rows - self.fetched_rows < self.batch_size:
                    # On the final batch, must fetch only remaining rows.
                    batch_size = self.max_rows - self.fetched_rows
            else:
                # Get default batch size.
                batch_size = self.batch_size

            batch = [dict(row.items()) for row in self.cursor.fetchmany(batch_size)]
            if len(batch):  # 0 rows were returned, so we're probs at the end.
                self.fetched_rows += len(batch)
                print('Fetch {} rows so far.'.format(repr(self.fetched_rows)), file=sys.stderr)
                return batch
            else:
                raise StopIteration
        except StopIteration:
            self.cursor.close()
            self.conn.close()
            self.engine.dispose()
            raise StopIteration

Проблема в этой строке:

batch = [dict(row.items()) for row in self.cursor.fetchmany(batch_size)]

выдает эту ошибку:

AttributeError: 'tuple' object has no attribute 'items'

У меня создалось впечатление, что объект RowProxy будет возвращен для каждого результата в наборе результатов, который поддерживает dict-подобные операции (как указано в этом сообщении), но похоже, что результаты представляют собой простые кортежи. Документы SQLAlchemy не на 100% ясны относительно ожидаемого типа результатов от курсора, предлагая только этот пример использования.

Вопрос: Я что-то не так делаю с использованием курсора? Мне нужны результаты, представляющие собой список словарей с именами столбцов в качестве ключей, но я не знаю, возможно ли это без RowProxy вместо кортежей.


person Garrett Bates    schedule 31.07.2019    source источник


Ответы (1)


Что ж, похоже, проблема связана с использованием сырого соединения:

        # Create a database cursor from query.
        self.conn = self.engine.raw_connection()
        self.cursor = self.conn.cursor()
        self.cursor.execute(self.query)

        ...

        batch = [dict(row.items()) for row in self.cursor.fetchmany(batch_size)]

был заменен на

        # Create a database cursor from query.
        self.engine = dst.get_connection(workspace_uuid, project_id)
        self.stream = self.engine.execution_options(stream_results=True).execute(self.query)

        ...

        batch = [dict(row.items()) for row in self.stream.fetchmany(batch_size)]

И все хорошо. К счастью, я использую драйвер, который поддерживает stream_results (psycopg2):

stream_results - Доступно: соединение, выписка. Укажите диалекту, что результаты должны быть «потоковыми», а не предварительно буферизоваться, если это возможно. Это ограничение многих DBAPI. В настоящее время флаг распознается только диалектами psycopg2, mysqldb и pymysql.

person Garrett Bates    schedule 31.07.2019