Apache Camel: загрузка нескольких файлов одновременно с использованием компонента SFTP

на sftp у меня есть несколько файлов со следующими именами xyz:

40_20200313_0cd6963f-bf5b-4eb0-b310-255a23ed778e_p.dat
123_20200313_0cd6963f-bf5b-4eb0-b310-255a23ed778e_p.dat
etc.

Я хочу, чтобы верблюд загружал все файлы сразу, так как в настоящее время он загружает файл один за другим.

Ниже приведен маршрут верблюда и запрос:

    private static String regex() {
        return "(22|23|24|25|26|28|29|32|35|40|41|46|52|70|85|88|123)_(?:.*)_p.dat";
    }

    private static String sftpComponent() {
        return "sftp://transit.ergogroup.no/Eyeshare/From_Eyeshare_Test"
                + "?username=Eyeshare_test"
                + "&password=epw3ePOugG" // Stored on wildfly server
                + "&download=true" //Shall be read chunk by chunk to avoid heap space issues. Earlier download=true was used: Harpreet
                + "&useList=true"
                + "&stepwise=false"
                + "&disconnect=true"
                + "&passiveMode=true"
                + "&reconnectDelay=10000"
//              + "&bridgeErrorHandler=true"
                + "&delay=300000"
                //+ "&fileName=" + sftpFileName
//              + "&include=kiki\\.txt"
//              + "&include=40_*_p\\.dat"sss
                + "&include="+regex()
                + "&preMove=$simple{file:onlyname}.$simple{date:now:yyyy-MM-dd'T'hh-mm-ss}.processing"
                + "&move=$simple{file:onlyname.noext}.$simple{date:now:yyyy-MM-dd'T'hh-mm-ss}.success"
                + "&moveFailed=$simple{file:onlyname.noext}.$simple{date:now:yyyy-MM-dd'T'hh-mm-ss}.failed";
//              + "&idempotentRepository=#infinispan"
//              + "&readLockRemoveOnCommit=true";
    }

   from(sftpComponent()).log("CHU").to(archiveReceivedFile())

Код выглядит нормально, но вывод - нет. Любой любезно предложить


person fatherazrael    schedule 18.03.2020    source источник
comment
Почему вы задаете много вопросов и игнорируете ответы на них? Что, если у другого возникнут такие же проблемы и он найдет ваши вопросы?   -  person c0ld    schedule 18.03.2020
comment
@c0ld: Это не так. Вы можете проверить всю мою историю с самого начала. Я принимаю ответы, пробую что-то, некоторые отложены, а также даю свои ответы. Как я могу принимать ответы без необходимости и сбивать с толку других? Прошу вас пройти всю благодарность   -  person fatherazrael    schedule 18.03.2020
comment
Я открываю ваш профиль, и последние 10 вопросов белые =) Да, некоторые из них без ответов, но с комментариями-помощниками. Хорошо, это твое решение. Вернемся к вопросу. Учитывая имя archiveReceivedFile(), вы хотите создать архив из потребляемой партии файлов? Просто нужно узнать больше о случае, потому что одновременное потребление неизвестного количества файлов - не очень хорошая идея.   -  person c0ld    schedule 18.03.2020
comment
@c0ld: ваше понимание правильное. Хорошо, так что я должен потреблять его по одному?   -  person fatherazrael    schedule 19.03.2020


Ответы (1)


Вот пример агрегатора:

from("file:///somePath/consume/?maxMessagesPerPoll=2&delay=5000")
            .aggregate(constant(true), new ZipAggregationStrategy()).completion(exchange -> exchange.getProperty("CamelBatchComplete", Boolean.class))
            .to("file:///somePath/produce/")

Здесь maxMessagesPerPoll определяет, сколько файлов будет заархивировано. Но если их количество в папке меньше, чем значение maxMessagesPerPoll, он будет ждать отсутствующих файлов для полного архива. Вот пример ZipAggregationStrategy:

private static class ZipAggregationStrategy implements AggregationStrategy {
    private ZipOutputStream zipOutputStream;
    private ByteArrayOutputStream out;
    @Override
    public Exchange aggregate(final Exchange oldExchange, final Exchange newExchange) {
        try {
            if (oldExchange == null) {
                out = new ByteArrayOutputStream();
                zipOutputStream = new ZipOutputStream(out);
            }
            createEntry(newExchange);
            return newExchange;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
    private void createEntry(final Exchange exchange) throws Exception {
        final ZipEntry zipEntry = new ZipEntry(exchange.getIn().getHeader(Exchange.FILE_NAME, String.class));
        zipOutputStream.putNextEntry(zipEntry);
        byte[] bytes = new byte[1024];
        int length;
        try (InputStream body = exchange.getIn().getBody(InputStream.class)) {
            while ((length = body.read(bytes)) >= 0) {
                zipOutputStream.write(bytes, 0, length);
            }
        }
    }
    @Override
    public void onCompletion(final Exchange exchange) {
        try {
            zipOutputStream.close();
            exchange.getIn().setBody(new ByteArrayInputStream(out.toByteArray()));
            exchange.getIn().setHeader(Exchange.FILE_NAME, "someArchive.zip");
        }catch (Exception e){
            throw new RuntimeException(e);
        } finally {
            try {
                out.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

Это пример в памяти. Вы можете улучшить его, например, с помощью временного файла. И вы всегда можете создать свой собственный предикат завершения, основанный на вашей логике.

UPD: думаю ссылка на документацию временно недоступна

person c0ld    schedule 20.03.2020
comment
Извините, но я не могу понять, как этот ответ связан с вопросом. Мне кажется, что вопрос в том, как одновременно загружать файлы с SFTP, но этот ответ о том, как архивировать пакет файлов или что-то в этом роде. - person ampofila; 06.11.2020