Spring не может распространить транзакцию на ForkJoin RecursiveAction

Я пытаюсь реализовать многопоточное решение, чтобы распараллелить свою бизнес-логику, включающую чтение и запись в базу данных.

Стек технологий: Spring 4.0.2, Hibernate 4.3.8

Вот код для обсуждения:

Конфигурация

@Configuration
public class PartitionersConfig {

    @Bean
    public ForkJoinPoolFactoryBean forkJoinPoolFactoryBean() {
        final ForkJoinPoolFactoryBean poolFactory = new ForkJoinPoolFactoryBean();
        return poolFactory;
    }
}

Услуга

@Service
@Transactional
public class MyService {

    @Autowired
    private OtherService otherService;

    @Autowired
    private ForkJoinPool forkJoinPool;

    @Autowired
    private MyDao myDao;

    public void performPartitionedActionOnIds() {
        final ArrayList<UUID> ids = otherService.getIds();

        MyIdPartitioner task = new MyIdsPartitioner(ids, myDao, 0, ids.size() - 1);
        forkJoinPool.invoke(task);
    }
}

Репозиторий / ДАО

@Repository
@Transactional(propagation = Propagation.MANDATORY)
public class IdsDao {

    public MyData getData(List<UUID> list) {
        // ... 
    }
}

Рекурсивное действие

public class MyIdsPartitioner extends RecursiveAction {

    private static final long serialVersionUID = 1L;
    private static final int THRESHOLD = 100;

    private ArrayList<UUID> ids;
    private int fromIndex;
    private int toIndex;

    private MyDao myDao;

    public MyIdsPartitioner(ArrayList<UUID> ids, MyDao myDao, int fromIndex, int toIndex) {
        this.ids = ids;
        this.fromIndex = fromIndex;
        this.toIndex = toIndex;
        this.myDao = myDao;
    }

    @Override
    protected void compute() {
        if (computationSetIsSamllEnough()) {
            computeDirectly();
        } else {
            int leftToIndex = fromIndex + (toIndex - fromIndex) / 2;
            MyIdsPartitioner leftPartitioner = new MyIdsPartitioner(ids, myDao, fromIndex, leftToIndex);
            MyIdsPartitioner rightPartitioner = new MyIdsPartitioner(ids, myDao, leftToIndex + 1, toIndex);

            invokeAll(leftPartitioner, rightPartitioner);
        }
    }

    private boolean computationSetIsSamllEnough() {
        return (toIndex - fromIndex) < THRESHOLD;
    }

    private void computeDirectly() {
        final List<UUID> subList = ids.subList(fromIndex, toIndex);
        final MyData myData = myDao.getData(sublist);
        modifyTheData(myData);
    }

    private void modifyTheData(MyData myData) {
        // ...
        // write to DB
    }
}

После выполнения этого я получаю:

Существующая транзакция не найдена для транзакции, помеченной как "обязательная" для распространения.

Я понял, что это совершенно нормально, поскольку транзакция не распространяется по разным потокам. Поэтому одним из решений является создание транзакции вручную в каждом потоке как предложено в другом подобном вопросе. Но меня это мало удовлетворило, и я продолжил поиски.

На форуме Spring я нашел обсуждение темы. Один абзац я нахожу очень интересным:

"Я могу представить, что можно вручную передать контекст транзакции в другой поток, но я не думаю, что вам стоит пытаться это делать. threadsafe. Использование одного единственного соединения в нескольких потоках нарушило бы основные контракты запроса/ответа jdbc, и было бы неудивительно, если бы это работало в более чем тривиальных примерах."

Итак, возникает первый вопрос: Стоит ли парализировать чтение/запись в базу данных и может ли это действительно повредить согласованности БД?
Если приведенная выше цитата не соответствует действительности, в чем я сомневаюсь, есть ли способ добиться следующего: MyIdPartitioner должен управляться Spring - с помощью @Scope("prototype") - и передавать ему необходимые аргументы для рекурсивных вызовов и, таким образом, оставить управление транзакциями Весна?


person nyxz    schedule 01.08.2015    source источник


Ответы (2)


После дальнейших чтений мне удалось решить мою проблему. Типа (как я вижу сейчас, проблем изначально не было).

Поскольку чтение, которое я делаю из БД, происходит по частям, и я уверен, что результаты не будут отредактированы в течение этого времени, я могу сделать это вне транзакции.

Запись также безопасна в моем случае, поскольку все значения, которые я записываю, уникальны, и никакие ограничения не могут быть нарушены. Поэтому я удалил транзакцию и оттуда.

Что я имею в виду, говоря «я удалил транзакцию», просто переопределите режим распространения метода в моем DAO, например:

@Repository
@Transactional(propagation = Propagation.MANDATORY)
public class IdsDao {

    @Transactional(propagation = Propagation.SUPPORTS)
    public MyData getData(List<UUID> list) {
        // ... 
    }
}

Или, если вы решите, что вам нужна транзакция по какой-то причине, вы все равно можете оставить управление транзакциями Spring, установив распространение на REQUIRED.

Так что решение оказывается намного проще, чем я думал.

И чтобы ответить на другие мои вопросы:

Стоит ли парализировать чтение/запись в базу данных и может ли это действительно повредить согласованности БД?

Да, это того стоит. И пока у вас есть транзакция на поток, вы круты.

Есть ли способ добиться следующего: MyIdPartitioner должен управляться Spring - с помощью @Scope ("prototype") - и передавать ему необходимые аргументы для рекурсивных вызовов и таким образом оставлять управление транзакциями Spring?

Да, есть способ с использованием пула (еще один вопрос о стеке). Или вы можете определить свой компонент как @Scope(value = "prototype", proxyMode = ScopedProxyMode.TARGET_CLASS), но тогда он не будет работать, если вам нужно установить для него параметры, поскольку каждое использование экземпляра даст вам новый экземпляр. Бывший.

@Autowire
MyIdsPartitioner partitioner;

public void someMethod() {
    ...
    partitioner.setIds(someIds);
    partitioner.setFromIndex(fromIndex);
    partitioner.setToIndex(toIndex);
    ...
}

Это создаст 3 экземпляра, и вы не сможете использовать объект с пользой, поскольку поля не будут установлены.

Короче говоря, есть способ, но мне не нужно было идти на него в первую очередь.

person nyxz    schedule 05.08.2015

Это должно быть возможно с атомикосом (http://www.atomikos.com) и, при необходимости, с вложенными транзакциями.

Если вы сделаете это, позаботьтесь о том, чтобы избежать взаимоблокировок, если несколько потоков одной и той же корневой транзакции записывают в одни и те же таблицы в базе данных.

person Guy Pardon    schedule 01.08.2015
comment
Хорошо, это похоже на то, на что стоит обратить внимание, но я надеялся сохранить свой технологический стек таким, какой он есть. Также это не полный ответ на оба моих вопроса. - person nyxz; 01.08.2015