Riemann - Динамическое создание потока по карте

У меня есть следующая функция, которая получает карту с именем службы и порогом. Он проверяет, пересекла ли служба определенный порог, а затем вызывает несколько нижестоящих дочерних элементов для события.

(defn tc
  [s & children]
   (where
     (and (service (:service_name s)) (not (expired? event)))
       (by [:host :service]
         (where (> metric (:threshold s)
           (with :state "critical" 
             (apply sdo children)))))))

Я хотел бы динамически построить поток, используя вектор карт:

(def services [{:service "cpu/usage" :threshold 90}
               {:service "memory/usage" :threshold 90}])

При попытке запустить его в потоке я получаю следующее предупреждение:

(streams
  (doseq [s services] (tc s prn)))

WARN [2015-01-05 14:27:07,187] Thread-15 - riemann.core - instrumentation service caught
java.lang.NullPointerException
  at riemann.core$stream_BANG_$fn__11140.invoke(core.clj:19)
  at riemann.core$stream_BANG_.invoke(core.clj:18)
  at riemann.core$instrumentation_service$measure__11149.invoke(core.clj:57)
  at riemann.service.ThreadService$thread_service_runner__8782$fn__8783.invoke(service.clj:66)
  at riemann.service.ThreadService$thread_service_runner__8782.invoke(service.clj:65)
  at clojure.lang.AFn.run(AFn.java:22)
  at java.lang.Thread.run(Thread.java:701)

Это работает, если я запускаю функцию потоков внутри дозы. Этот работает и дает следующий результат:

(doseq [s services]
  (streams (tc s prn)))

#riemann.codec.Event{:host "testhost", :service "memory/usage", :state "critical", :description nil, :metric 91.0, :tags nil, :time 1420460856, :ttl 60.0}

person Igor Liner    schedule 05.01.2015    source источник
comment
это ваши точные события или они были опущены для вопроса?   -  person Arthur Ulfeldt    schedule 06.01.2015
comment
@ArthurUlfeldt, я не уверен, что проблема в событии. Поскольку функция работает, если я использую следующий код (doseq [s services] (streams (tc s prn))). Я не хочу создавать поток для каждой службы, но один поток для всех служб.   -  person Igor Liner    schedule 06.01.2015
comment
(with :service "everything" ... process combined stream here ..) объединил бы все в одну услугу   -  person Arthur Ulfeldt    schedule 06.01.2015
comment
@ArthurUlfeldt, я бы хотел иметь вектор карт. Каждая карта описывает услугу и порог. Я хотел бы проверить, что если служба пересекает определенный порог, то состояние события устанавливается как критическое. Что я пытаюсь сделать, так это запустить функцию потоков, используя этот набор данных.   -  person Igor Liner    schedule 07.01.2015


Ответы (1)


Кажется, он взрывается, если в ваших событиях нет всех обязательных полей, вот пример из аналогичного проекта, где я создаю событие из последовательности событий (сокращение). Это не совсем то, что вы делаете, хотя я генерирую события таким же образом:

{:service (:service (first events))
 :metric (->> events count)
 :host "All-counts"
 :state "OK"
 :time (:time (last events))
 :ttl default-interval}

Я получил NPE конкретно, когда времени не хватало. Если вы не можете где-то наследовать его форму, просто создайте его (например, используйте now) без разумного значения здесь, срок действия события не будет работать, и у вас закончится оперативная память.

person Arthur Ulfeldt    schedule 05.01.2015