Многопроцессорная обработка Cassandra не может обрабатывать объекты _thread.lock

Я попытался использовать Cassandra и multiprocessing для одновременной вставки строк (фиктивных данных) на основе примеров в

http://www.datastax.com/dev/blog/datastax-python-driver-multiprocessing-example-for-improved-bulk-data-throughput.

Это мой код

class QueryManager(object):

concurrency = 100  # chosen to match the default in execute_concurrent_with_args

def __init__(self, session, process_count=None):
    self.pool = Pool(processes=process_count, initializer=self._setup, initargs=(session,))

@classmethod
def _setup(cls, session):
    cls.session = session
    cls.prepared = cls.session.prepare("""
INSERT INTO test_table (key1, key2, key3, key4, key5) VALUES (?, ?, ?, ?, ?)
""")

def close_pool(self):
    self.pool.close()
    self.pool.join()

def get_results(self, params):
    results = self.pool.map(_multiprocess_write, (params[n:n+self.concurrency] for n in range(0, len(params), self.concurrency)))
    return list(itertools.chain(*results))

@classmethod
def _results_from_concurrent(cls, params):
    return [results[1] for results in execute_concurrent_with_args(cls.session, cls.prepared, params)]


def _multiprocess_write(params):
    return QueryManager._results_from_concurrent(params)


if __name__ == '__main__':

    processes = 2

    # connect cluster
    cluster = Cluster(contact_points=['127.0.0.1'], port=9042)
    session = cluster.connect()

    # database name is a concatenation of client_id and system_id
    keyspace_name = 'unit_test_0'

    # drop keyspace if it already exists in a cluster
    try:
        session.execute("DROP KEYSPACE IF EXISTS " + keyspace_name)
    except:
        pass

    create_keyspace_query = "CREATE KEYSPACE " + keyspace_name \
                        + " WITH replication = {'class': 'SimpleStrategy',    'replication_factor': '1'};"
    session.execute(create_keyspace_query)

    # use a session's keyspace
    session.set_keyspace(keyspace_name)

    # drop table if it already exists in the keyspace
    try:
        session.execute("DROP TABLE IF EXISTS " + "test_table")
    except:
        pass

    # create a table for invoices in the keyspace
    create_test_table = "CREATE TABLE test_table("

    keys = "key1 text,\n" \
           "key2 text,\n" \
           "key3 text,\n" \
           "key4 text,\n" \
           "key5 text,\n"

    create_invoice_table_query += keys
    create_invoice_table_query += "PRIMARY KEY (key1))"
    session.execute(create_test_table)

    qm = QueryManager(session, processes)

    params = list()
    for row in range(100000):
        key = 'test' + str(row)
        params.append([key, 'test', 'test', 'test', 'test'])

    start = time.time()
    rows = qm.get_results(params)
    delta = time.time() - start
    log.info(fm('Cassandra inserts 100k dummy rows for ', delta, ' secs'))

когда я выполнил код, я получил следующую ошибку

TypeError: can't pickle _thread.lock objects

который указал на

self.pool = Pool(processes=process_count, initializer=self._setup, initargs=(session,))

person daiyue    schedule 21.06.2016    source источник
comment
Возможная помощь для других, прибывающих сюда: stackoverflow.com/questions/44005212/   -  person bpgriner    schedule 15.08.2017


Ответы (2)


Это наводит на мысль, что вы пытаетесь сериализовать блокировку над границами IPC. Я думаю, это может быть потому, что вы предоставляете объект Session в качестве аргумента функции инициализации рабочего. Заставьте функцию init создавать новый сеанс в каждом рабочем процессе (см. Раздел «Сеанс для каждого процесса» в цитируемое вами сообщение в блоге).

person Adam Holmberg    schedule 24.10.2016

Я знаю, что на это уже есть ответ, но я хотел выделить пару изменений в пакете cassandra-driver, из-за которых этот код по-прежнему не работает должным образом с python 3.7 и пакетом cassandra-driver 3.18.0.

Если вы посмотрите на сообщение в блоге, на которое есть ссылка. Функция __init__ не передает session, но передает объект cluster. Даже cluster больше нельзя отправлять как initarg, потому что он содержит блокировку. Вам нужно создать его внутри def _setup(cls): classmethod.

Во-вторых, execute_concurrent_with_args теперь возвращает ResultSet, который также нельзя сериализовать. Старая версия пакета cassandra-driver просто вернула список объектов.

Чтобы исправить приведенный выше код, измените эти 2 раздела:

Во-первых, методы __init__ и _setup

def __init__(self, process_count=None):
    self.pool = Pool(processes=process_count, initializer=self._setup)

@classmethod
def _setup(cls):
    cluster = Cluster()
    cls.session = cluster.connect()
    cls.prepared = cls.session.prepare("""
        INSERT INTO test_table (key1, key2, key3, key4, key5) VALUES (?, ?, ?, ?, ?)
        """)

Во-вторых, метод _results_from_concurrent

@classmethod
def _results_from_concurrent(cls, params):
    return [list(results[1]) for results in execute_concurrent_with_args(cls.session, cls.prepared, params)]

Наконец, если вас интересует суть multiprocess_execute.py в исходном сообщении блога DataStax, которое работает с python3 и cassandra-driver 3.18.0, вы можете найти это здесь: https://gist.github.com/jWolo/6127b2e57c7e24740afd7a4254cc00a3

person jWoose    schedule 25.06.2019