Что можно использовать в качестве тестовой заглушки для CassandraWriterBolt?

Я прочитал json от Kafka, FieldExtractionBolt читает, что json извлекает данные в значения кортежа и передает их CassandraWriterBolt, который, в свою очередь, записывает запись в Cassandra, записывая все эти значения кортежа в отдельные столбцы.

Сообщение JSON о Кафке -

{"pair":"GBPJPY","bid":134.4563,"ask":134.4354}

FieldExtractionBolt -

String message = tuple.getStringByField("message");
Map values = new Gson().fromJson(message, Map.class);
basicOutputCollector.emit(new Values(values.get("pair"), values.get("bid"), values.get("ask")));

КассандраПисательБолт -

return (CassandraWriterBolt) new CassandraWriterBolt(async(simpleQuery("INSERT INTO currency(pair, ask, bid) VALUES (?, ?, ?);").with(fields("pair", "ask", "bid")))

Я попытался написать тест на основе приведенного здесь ответа - Как E2E-тестировать функциональность Storm Topology путем программной вставки сообщений

В моем проекте я определяю все свои болты, носики и потоки в конфигурации Spring. Это делает написание/чтение моей топологии очень простым. Я строю топологию, получая болты, носики и потоковые компоненты из ApplicationContext. В моей конфигурации Spring KafkaSpout и CassandraWriterBolt определены в профиле «prod», поэтому они могут использоваться только в prod, а в профиле «test» я определяю заглушки для KafkaSpout и CassandraWriterBolt. Для KafkaSpout я использовал FixedToupleSpout, а для CassandraWriterBolt — TestWordCounter.

это мой тест

        @Test
        public void testTopology(){
        StormTopology topology = SpringBasedTopologyBuilder.getInstance().buildStormTopologyUsingApplicationContext(applicationContext);
        TestJob COMPLETE_TOPOLOGY_TESTJOB = (cluster) -> {
              MockedSources mocked = new MockedSources();
                    mocked.addMockData("kafkaSpout",
                new Values("{\"pair\":\"GBPJPY\",\"bid\":134.4563,\"ask\":134.4354}"),
        new Values("{\"pair\":\"GBPUSD\",\"bid\":1.4563,\"ask\":1.4354}"));

        Config topoConf = new Config();
        topoConf.setNumWorkers(2);

        CompleteTopologyParam ctp = new CompleteTopologyParam();
        ctp.setMockedSources(mocked);
        ctp.setStormConf(topoConf);
        Map<String, List<FixedTuple>> results = Testing.completeTopology(cluster, topology, ctp);
                    List<List<Object>> cassandraTuples = Testing.readTuples(results, "cassandraWriterBolt");
        List<List<Object>> expectedCassandraTuples = Arrays.asList(Arrays.asList("GBPJPY", 1), Arrays.asList("GBPUSD", 1),
                Arrays.asList("134.4563", 1), Arrays.asList("1.4563", 1), Arrays.asList("134.4354", 2));
        assertTrue(expectedCassandraTuples + " expected, but found " + cassandraTuples,
                Testing.multiseteq(expectedCassandraTuples, cassandraTuples));
    MkClusterParam param = new MkClusterParam();
    param.setSupervisors(4);

    Testing.withSimulatedTimeLocalCluster(param, COMPLETE_TOPOLOGY_TESTJOB);
}

@Configuration
@Import(MainApplication.class)
public static class TestConfig
{
    @Bean
    public IRichSpout kafkaSpout(){
        return new FixedTupleSpout(Arrays.asList(new FixedTuple(Arrays.asList("{\"pair\":\"GBPJPY\",\"bid\":134.4563,\"ask\":134.4354"))), new Fields(new String[]{"message"}));
    }

    @Bean
    public IBasicBolt cassandraWriterBolt(){
        return new TestWordCounter();
    }
}

Результат, который я получаю, не то, что я ожидаю. Я получаю следующую ошибку -

        java.lang.AssertionError: [[GBPJPY, 1], [GBPUSD, 1], [134.4563, 1], [1.4563, 1], [134.4354, 2]] expected, but found [[GBPJPY, 1], [GBPUSD, 1]]

Похоже, TestWordCounter просто считывает первое значение как кортеж (только валютная пара, пропуская бид и аск). Кажется, TestWordCounter здесь не подходит. Какой будет правильная заглушка для CassandraWriterBolt, чтобы я мог утверждать, что он получит 2 записи, одну для GBPJPY, а другую для GBPUSD с их ценой покупки и продажи?


person Prashant Bhardwaj    schedule 28.07.2019    source источник


Ответы (1)


Testing.readTuples(results, "cassandraWriterBolt") вернет кортежи, созданные "cassandraWriterBolt". Это то, что вы пытаетесь проверить? Я думаю, вы пытаетесь утверждать, какие кортежи получает «cassandraWriterBolt», а не то, что он испускает.

Здесь вы можете сделать две вещи. Вы можете использовать readTuples для чтения из болтов, которые излучаются в болт Кассандры, вместо чтения из болта Кассандры. Это достойное решение, если ваша топология проста (например, не так много разных болтов, записывающих болт Кассандры).

Лучшее решение (ИМО) — написать простую заглушку для замены TestWordCounter. Единственное, что должен сделать болт, — это получить входной кортеж, подтвердить его и передать значения в новом кортеже.

execute(Tuple input, BasicOutputCollector collector) {
  collector.emit(input.getValues());
}

Затем вы можете использовать readTuples для чтения кортежей, которые выдает Bolt, которые будут теми же значениями, которые он получает.

person Stig Rohde Døssing    schedule 29.07.2019