Как дросселировать запрос записи в кассандру при работе с executeAsync?

Я использую драйвер Java Datastax 3.1.0 для подключения к кластеру cassandra, а моя версия кластера cassandra - 2.0.10. Я пишу асинхронно с КВОРУМНОЙ последовательностью.

  private final ExecutorService executorService = Executors.newFixedThreadPool(10);

  public void save(String process, int clientid, long deviceid) {
    String sql = "insert into storage (process, clientid, deviceid) values (?, ?, ?)";
    try {
      BoundStatement bs = CacheStatement.getInstance().getStatement(sql);
      bs.setConsistencyLevel(ConsistencyLevel.QUORUM);
      bs.setString(0, process);
      bs.setInt(1, clientid);
      bs.setLong(2, deviceid);

      ResultSetFuture future = session.executeAsync(bs);
      Futures.addCallback(future, new FutureCallback<ResultSet>() {
        @Override
        public void onSuccess(ResultSet result) {
          logger.logInfo("successfully written");
        }

        @Override
        public void onFailure(Throwable t) {
          logger.logError("error= ", t);
        }
      }, executorService);
    } catch (Exception ex) {
      logger.logError("error= ", ex);
    }
  }

Мой вышеупомянутый метод сохранения будет вызываться из нескольких потоков на очень высокой скорости.

Вопрос:

Я хочу ограничить запрос метода executeAsync, который асинхронно записывает в Cassandra. Если я пишу на очень высокой скорости, с которой не может справиться мой кластер Cassandra, тогда он начнет выдавать ошибки, и я хочу, чтобы все мои записи успешно шли в Cassandra без каких-либо потерь.

Я видел это сообщение, в котором решением является использование Semaphore с фиксированным количеством разрешений. Но я не уверен, как и как лучше всего это реализовать. Раньше я никогда не использовал Semaphor. Это логика. Может ли кто-нибудь предоставить пример с семафором на основе моего кода или, если есть лучший способ / вариант, дайте мне знать.

В контексте написания программы загрузчика данных вы могли бы сделать что-то вроде следующего:

  • Чтобы упростить задачу, используйте семафор или какую-либо другую конструкцию с фиксированным количеством разрешений (это будет ваше максимальное количество запросов в полете). Каждый раз, когда вы отправляете запрос с помощью executeAsync, приобретайте разрешение. Вам действительно нужен только 1 поток (но, возможно, вы захотите ввести пул размером # ядер процессора, который делает это), который получает разрешения от семафора и выполняет запросы. Он будет просто блокироваться при получении, пока не появится доступное разрешение.
  • Используйте Futures.addCallback для будущего, возвращенного от executeAsync. Обратный вызов должен вызывать Sempahore.release () как в случаях onSuccess, так и в onFailure. Освобождение разрешения должно позволить вашему потоку на шаге 1 продолжить и отправить следующий запрос.

Также я видел пару других сообщений, где они говорили об использовании RingBuffer или Guava RateLimitter, поэтому какой из них лучше и мне следует использовать? Ниже приведены варианты, о которых я могу думать:

  • Использование семафора
  • Использование кольцевого буфера
  • Использование ограничителя скорости Guava

Может ли кто-нибудь помочь мне с примером того, как мы можем ограничить запрос или получить противодавление для записи кассандры и убедиться, что все записи успешно проходят в кассандру?


person john    schedule 08.12.2016    source источник


Ответы (2)


Не авторитетный ответ, но, возможно, он был бы полезен. Сначала вы должны подумать, что бы вы сделали, если запрос не может быть выполнен сразу. Независимо от того, какое ограничение скорости вы выбрали, если вы получаете запросы с большей скоростью, чем вы можете писать в Cassandra, в конечном итоге вы забьете свой процесс ожидающими запросами. И в этот момент вам нужно будет сказать вашим клиентам, чтобы они на время задерживали свои запросы («оттолкнуть»). Например. если они поступают через HTTP, статус ответа будет 429 «Слишком много запросов». Если вы генерируете запросы в одном процессе, тогда решите, какой максимальный тайм-аут будет приемлемым. Тем не менее, если Кассандра не успевает, то пора ее масштабировать (или настраивать).

Возможно, перед введением ограничений скорости стоит поэкспериментировать и добавить искусственные задержки в ваши потоки перед вызовом метода save (с использованием Thread.sleep (...)) и посмотреть, решит ли он вашу проблему или нужно что-то еще.

Ошибка возврата запроса является противодавлением от Кассандры. Но вы можете выбрать или реализовать RetryPolicy, чтобы определить, когда повторять неудачные запросы.

Также вы можете посмотреть параметры пула соединений (и особенно Мониторинг и настройка пула). Можно настроить количество асинхронных запросов на соединение. Однако в документации говорится, что для Cassandra 2.x этот параметр ограничен 128, и его не следует изменять (хотя я бы поэкспериментировал с этим :)

Реализация с семафором выглядит как

/* Share it among all threads or associate with a thread for per-thread limits
   Number of permits is to be tuned depending on acceptable load.
*/
final Semaphore queryPermits = new Semaphore(20); 


public void save(String process, int clientid, long deviceid) {
  ....
  queryPermits.acquire(); // Blocks until a permit is available

  ResultSetFuture future = session.executeAsync(bs);
  Futures.addCallback(future, new FutureCallback<ResultSet>() {
    @Override
    public void onSuccess(ResultSet result) {
      queryPermits.release();
      logger.logInfo("successfully written");
    }
    @Override
    public void onFailure(Throwable t) {
      queryPermits.release(); // Permit should be released in all cases.
      logger.logError("error= ", t);
    }
  }, executorService);
  ....
}

(В реальном коде я бы создал обратный вызов оболочки, который бы выпускал разрешения, а затем вызывал обернутые методы)

RateLimiter Guava похож на семафор, но допускает временные всплески после периодов недостаточного использования и ограничивает запросы на основе времени (а не общего количества активных запросов).

Однако запросы все равно не будут выполняться по разным причинам, поэтому, вероятно, лучше иметь план, как повторить их (в случае периодических ошибок).

В вашем случае это может быть неприемлемо, но я бы попытался использовать какую-нибудь очередь или буфер для постановки запросов в очередь (например, java.util.concurrent.ArrayBlockingQueue). «Буфер заполнен» означает, что клиенты должны подождать или отказаться от запроса. Буфер также будет использоваться для повторной постановки в очередь неудавшихся запросов. Однако, чтобы быть более справедливым, неудавшиеся запросы, вероятно, следует помещать в начало очереди, чтобы они сначала выполнялись повторно. Также нужно как-то справляться с ситуацией, когда очередь заполнена и одновременно появляются новые неудавшиеся запросы. Затем однопоточный воркер будет выбирать запросы из очереди и отправлять их в Cassandra. Поскольку это не должно делать много, маловероятно, что это станет узким местом. Этот воркер также может применять свои собственные ограничения скорости, например на основе времени с com.google.common.util.concurrent.RateLimiter.

Если кто-то хочет максимально избежать потери сообщений, он может поставить посредника сообщений с настойчивостью (например, Kafka) перед Cassandra. Таким образом, входящие сообщения могут выдержать даже длительные перерывы в работе Cassandra. Но, думаю, в твоем случае это излишество.

person Petr Gladkikh    schedule 02.01.2017
comment
Как вы думаете, можете ли вы предоставить мне пример очереди или примера буфера, который вы мне дали? Думаю, это лучше всего подойдет мне в моем сценарии. - person john; 20.01.2017

Простое использование очереди с блокировкой должно подойти. Фьючерсы являются многопоточными, и обратный вызов (успешный и неудачный) будет действовать как потребитель, и где бы вы ни вызывали метод сохранения, он будет действовать как производитель.

Еще лучше будет, если вы поместите полный запрос в очередь и удаляете его один за другим, сохраняя при каждом удалении.

private final ExecutorService executorService = Executors.newFixedThreadPool(10);

public void save(String process, int clientid, long deviceid, BlockingQueue<Object> queue) {
    String sql = "insert into storage (process, clientid, deviceid) values (?, ?, ?)";
    try {
      BoundStatement bs = CacheStatement.getInstance().getStatement(sql);
      bs.setConsistencyLevel(ConsistencyLevel.QUORUM);
      bs.setString(0, process);
      bs.setInt(1, clientid);
      bs.setLong(2, deviceid);

      ResultSetFuture future = session.executeAsync(bs);
      Futures.addCallback(future, new FutureCallback<ResultSet>() {
        @Override
        public void onSuccess(ResultSet result) {
          logger.logInfo("successfully written");
          queue.take();
        }

        @Override
        public void onFailure(Throwable t) {
          logger.logError("error= ", t);
          queue.take();
        }
      }, executorService);
    } catch (Exception ex) {
      logger.logError("error= ", ex);
    }
}

public void invokeSaveInLoop(){
    Object dummyObj = new Object();
    BlockingQueue<Object> queue = new ArrayBlockingQueue<>(20);;
    for(int i=0; i< 1000; i++){
        save("process", clientid, deviceid, queue);
        queue.put(dummyObj);
    }
}

Если вы хотите пойти дальше и проверить нагрузку на кластер на полпути

public static String getCurrentState(){    
StringBuilder response = new StringBuilder();
            response.append("Current Database Connection Status <br>\n ---------------------------------------------<br>\n");
            final LoadBalancingPolicy loadBalancingPolicy =
                    cluster.getConfiguration().getPolicies().getLoadBalancingPolicy();
            final PoolingOptions poolingOptions =
                    cluster.getConfiguration().getPoolingOptions();
            Session.State state = session.getState();
            for (Host host : state.getConnectedHosts()) {
                HostDistance distance = loadBalancingPolicy.distance(host);
                int connections = state.getOpenConnections(host);
                int inFlightQueries = state.getInFlightQueries(host);
                response.append(String.format("%s current connections=%d, max allowed connections=%d, current load=%d, max load=%d%n",
                                host, connections, poolingOptions.getMaxConnectionsPerHost(distance), inFlightQueries,
                                connections *
                                        poolingOptions.getMaxRequestsPerConnection(distance)))
                        .append("<br>\n");
            }
            return response.toString();
}
person Abhishek Anand    schedule 08.01.2017