Simmer in R: Моделирование изменений производительности сервера в зависимости от длины и продолжительности очереди

Я пытаюсь смоделировать систему следующим образом:

Прибытия генерируются в соответствии с заранее определенным расписанием и имеют известное время обработки, предоставляемое кадром данных. В начале симуляции имеется единственный сервер с мощностью, равной min_daemons. Пока все просто, но с nxt все сложнее: эта мощность может меняться на интервале [min_daemons, max_daemons] на протяжении всей симуляции по следующему алгоритму:

Если в любой момент моделирования длина очереди достигает или превышает значение incr_count и остается на этом уровне или выше для incr_delay, то к основному серверу добавляется дополнительная единица емкости. Это может произойти в любой момент, при условии, что мощность никогда не превышает max_daemons.

Обратное также верно. Если в любой момент длина очереди меньше, чем decr_count, и остается на уровне или ниже этого уровня для decr_delay, единица емкости удаляется, возможно, до уровня min_daemons.

Я создал траекторию для всех прибытий, которая разветвляется, когда выполняются указанные выше условия изменения мощности сервера. Проблема в том, что изменения пропускной способности всегда связаны с событием прибытия. Чего я действительно хочу, так это процесса, независимого от траектории прибытия, который постоянно отслеживает длину очереди и вносит соответствующие изменения в пропускную способность.

В качестве альтернативы я рассматривал вариант выполнения этого с помощью какого-то фиктивного процесса прибытия, скажем, каждую секунду моделирования, но я не был уверен, смогу ли я предотвратить конкуренцию фиктивных прибытий с реальными поступлениями за пропускную способность сервера.

#instantiate simulation environment
env <- simmer("queues") %>% add_resource("daemon",1) %>% add_global("incr_start",99999) %>% add_global("decr_start",99999)

run <-  trajectory() %>% 
  branch(option = function() if (get_queue_count(env,"daemon") >=  incr_count) {1}
                             else if (get_queue_count(env,"daemon") <= decr_count) {2}
                             else {3}
         ,
         continue = c(T, T, T)
         ,
         trajectory("increment?")
         %>% branch(option = function() if (now(env) - get_global(env,"incr_start") >= incr_delay
                                            & get_capacity(env,"daemon") < max_daemons) {1}
                                        else if (get_global(env,"incr_start")==99999) {2}
                                        else {3}
                    ,
                    continue = c(T, T, T)
                    ,
                    trajectory("increment")
                    %>% log_(function () {paste("Queue size is: ",get_queue_count(env,"daemon"))})
                    %>% log_(function () 
                      {paste("Queue has exceeded count for ",now(env)-get_global(env,"incr_start")," seconds.")})
                    %>% set_capacity(resource = "daemon", value = 1, mod="+")
                    ,
                    trajectory("update incr start")
                    %>% set_global("incr_start",now(env))
                    %>% log_("Queue count is now above increment count. Starting increment timer.")
                    ,
                    trajectory("do nothing")
                    %>% log_("Did not meet increment criteria. Doing nothing.")
         )
         ,
         trajectory("decrement?")
         %>% branch(option = function() if (now(env) - get_global(env,"decr_start") >= decr_delay
                                            & get_capacity(env,"daemon") > min_daemons) {1}
                                        else if (get_global(env,"decr_start")==99999) {2}
                                        else {3}
                    ,
                    continue = c(T, T, T)
                    ,
                    trajectory("decrement")
                    %>% log_(function () {paste("Queue size is: ",get_queue_count(env,"daemon"))})
                    %>% log_(function () 
                      {paste("Queue has been less than count for ",now(env)-get_global(env,"incr_start")," seconds.")})
                    %>% set_capacity(resource = "daemon", value = -1, mod="+")
                    ,
                    trajectory("update decr start")
                    %>% set_global("decr_start",now(env))
                    %>% log_("Queue count is now below decrement count. Starting decrement timer.")
                    ,
                    trajectory("do nothing")
                    %>% log_("Did not meet decrement criteria. Doing nothing.")
         )
         ,
         trajectory("reset_timer")
         %>% log_("Did not meet criteria to increment or decrement. Resetting timers.")
         %>% set_global("decr_start", values = 99999)
         %>% set_global("decr_start", values = 99999)
  ) %>%
  seize("daemon") %>% 
  log_("Now running") %>%
  log_(function () {paste(get_queue_count(env,"daemon")," runs in the queue.")}) %>%
  timeout_from_attribute("service") %>% 
  release("daemon") %>% 
  log_("Run complete")

env %>% 
  add_dataframe("run", run, arr,time="absolute") %>% 
  run(200)

Мне нужно выполнить дополнительную отладку, чтобы убедиться, что симуляция работает так, как я задумал, но я полностью понимаю, что эта модель неверна. Я могу надеяться, что дизайн не слишком скомпрометирует его достоверность, но я также хочу получить обратную связь о том, как я могу создать что-то более близкое к реальной жизни.


person Jack Rossi    schedule 26.03.2019    source источник
comment
В чем вопрос? Где ваша модель работает неправильно? Откуда вы знаете, что это неправильно?   -  person Corey Levinson    schedule 26.03.2019
comment
Пункты 5 и 6 - вопрос. Мой код выполняется без ошибок, но модель неверна, и я не могу понять, как правильно реализовать ее стандартными инструментами симмера. Настоящая система постоянно проверяет время, когда длина очереди превышает диапазон длин в течение определенного периода времени, и в этот момент она регулирует мощность сервера. Моя модель способна выполнять эту настройку только при поступлении нового товара.   -  person Jack Rossi    schedule 26.03.2019


Ответы (1)


Проверка состояния через регулярные промежутки времени сводит на нет весь смысл дискретных событий. Здесь у нас асинхронный процесс, поэтому правильный способ его моделирования — использование сигналов.

Нам нужно спросить себя: когда очередь могла...

  1. увеличивать? Когда прибытие достигает активности seize(). Поэтому, прежде чем пытаться захватить ресурс, нам нужно проверить количество поставленных в очередь поступлений и действовать соответствующим образом:

    • If it's equal to decr_count, a signal must be sent to cancel any attempt to decrease the server's capacity.
    • Если он равен incr_count - 1, необходимо отправить сигнал для запроса увеличения мощности сервера.
    • Ничего не делать иначе.
  2. снижаться? Когда прибытие обслуживается (т. е. переходит к следующему действию после seize(). Таким образом, после захвата ресурса нам также необходимо проверить количество поставленных в очередь прибытий:

    • If it's equal to incr_count - 1, a signal must be sent to cancel any attempt to increase the server's capacity.
    • Если он равен decr_count, необходимо отправить сигнал на запрос уменьшения мощности сервера.
    • Ничего не делать иначе.

Процедура проверки количества поставленных в очередь прибытий и принятия решения о том, какой сигнал требуется, если таковой имеется, может быть объединена в многократно используемую функцию (назовем ее check_queue) следующим образом:

library(simmer)

env <- simmer()

check_queue <- function(.trj, resource, mod, lim_queue, lim_server) {
  .trj %>% branch(
    function() {
      if (get_queue_count(env, resource) == lim_queue[1])
        return(1)
      if (get_queue_count(env, resource) == lim_queue[2] &&
          get_capacity(env, resource)    != lim_server)
        return(2)
      0 # pass
    },
    continue = c(TRUE, TRUE),
    trajectory() %>% send(paste("cancel", mod[1])),
    trajectory() %>% send(mod[2])
  )
}

main <- trajectory() %>%
  check_queue("resource", c("-", "+"), c(decr_count, incr_count-1), max_daemons) %>%
  seize("resource") %>%
  check_queue("resource", c("+", "-"), c(incr_count-1, decr_count), min_daemons) %>%
  timeout_from_attribute("service") %>%
  release("resource")

Таким образом, основная траектория довольно проста. Затем нам нужна пара процессов для получения таких сигналов и увеличения/уменьшения емкости после некоторой задержки:

change_capacity <- function(resource, mod, delay, limit) {
  trajectory() %>%
    untrap(paste("cancel", mod)) %>%
    trap(mod) %>%
    wait() %>%
    # signal received
    untrap(mod) %>%
    trap(paste("cancel", mod),
         handler = trajectory() %>%
           # cancelled! start from the beginning
           rollback(Inf)) %>%
    timeout(delay) %>%
    set_capacity(resource, as.numeric(paste0(mod, 1)), mod="+") %>%
    # do we need to keep changing the capacity?
    rollback(2, check=function() get_capacity(env, resource) != limit) %>%
    # start from the beginning
    rollback(Inf)
}

incr_capacity <- change_capacity("resource", "+", incr_delay, max_daemons)
decr_capacity <- change_capacity("resource", "-", decr_delay, min_daemons)

Наконец, мы добавляем ресурс, наши процессы и данные в среду моделирования:

env %>%
  add_resource("resource", min_daemons) %>%
  add_generator("incr", incr_capacity, at(0)) %>%
  add_generator("decr", decr_capacity, at(0)) %>%
  add_dataframe("arrival", main, data)

Пожалуйста, обратите внимание, что я не проверял этот код. Это может потребовать некоторых корректировок, но общая идея есть.

person Iñaki Úcar    schedule 27.03.2019
comment
Большое спасибо за это! Я нашел ваше объяснение чрезвычайно полезным, и ваш код удивительно функционален. Единственное, что мне пришлось изменить, это функцию проверки в откате, чтобы она снова проверяла, что очередь больше счетчика приращений или меньше счетчика декрементов. - person Jack Rossi; 09.05.2019