Я использую драйвер Java Datastax 3.1.0 для подключения к кластеру cassandra, а моя версия кластера cassandra - 2.0.10. Я пишу асинхронно с КВОРУМНОЙ последовательностью.
private final ExecutorService executorService = Executors.newFixedThreadPool(10);
public void save(String process, int clientid, long deviceid) {
String sql = "insert into storage (process, clientid, deviceid) values (?, ?, ?)";
try {
BoundStatement bs = CacheStatement.getInstance().getStatement(sql);
bs.setConsistencyLevel(ConsistencyLevel.QUORUM);
bs.setString(0, process);
bs.setInt(1, clientid);
bs.setLong(2, deviceid);
ResultSetFuture future = session.executeAsync(bs);
Futures.addCallback(future, new FutureCallback<ResultSet>() {
@Override
public void onSuccess(ResultSet result) {
logger.logInfo("successfully written");
}
@Override
public void onFailure(Throwable t) {
logger.logError("error= ", t);
}
}, executorService);
} catch (Exception ex) {
logger.logError("error= ", ex);
}
}
Мой вышеупомянутый метод сохранения будет вызываться из нескольких потоков на очень высокой скорости.
Вопрос:
Я хочу ограничить запрос метода executeAsync
, который асинхронно записывает в Cassandra. Если я пишу на очень высокой скорости, с которой не может справиться мой кластер Cassandra, тогда он начнет выдавать ошибки, и я хочу, чтобы все мои записи успешно шли в Cassandra без каких-либо потерь.
Я видел это сообщение, в котором решением является использование Semaphore
с фиксированным количеством разрешений. Но я не уверен, как и как лучше всего это реализовать. Раньше я никогда не использовал Semaphor. Это логика. Может ли кто-нибудь предоставить пример с семафором на основе моего кода или, если есть лучший способ / вариант, дайте мне знать.
В контексте написания программы загрузчика данных вы могли бы сделать что-то вроде следующего:
- Чтобы упростить задачу, используйте семафор или какую-либо другую конструкцию с фиксированным количеством разрешений (это будет ваше максимальное количество запросов в полете). Каждый раз, когда вы отправляете запрос с помощью executeAsync, приобретайте разрешение. Вам действительно нужен только 1 поток (но, возможно, вы захотите ввести пул размером # ядер процессора, который делает это), который получает разрешения от семафора и выполняет запросы. Он будет просто блокироваться при получении, пока не появится доступное разрешение.
- Используйте Futures.addCallback для будущего, возвращенного от executeAsync. Обратный вызов должен вызывать Sempahore.release () как в случаях onSuccess, так и в onFailure. Освобождение разрешения должно позволить вашему потоку на шаге 1 продолжить и отправить следующий запрос.
Также я видел пару других сообщений, где они говорили об использовании RingBuffer
или Guava RateLimitter
, поэтому какой из них лучше и мне следует использовать? Ниже приведены варианты, о которых я могу думать:
- Использование семафора
- Использование кольцевого буфера
- Использование ограничителя скорости Guava
Может ли кто-нибудь помочь мне с примером того, как мы можем ограничить запрос или получить противодавление для записи кассандры и убедиться, что все записи успешно проходят в кассандру?