Apache camel sftp org.apache.camel.NoTypeConversionAvailableException: нет преобразователя типов для преобразования java.io.InputStream

В настоящее время я застрял с загрузкой файлов с ftp-сервера из-за преобразования типов.

Таким образом, упомянутый ниже маршрут вызывается из конвейера, у которого уже есть мой пользовательский POJO в качестве тела. А пользовательское pojo такое, как показано ниже:

public class DirectoryLocationListing {

    private String domainName;

    private String countryCode;

    private String productName;

    private String profileId;

    private String directoryLocation;

И маршрут для sftp выглядит следующим образом: я извлекаю все файлы из каталога определенного сервера и сохраняю их на пути к локальному файлу.

 <route id="route5">
        <from uri="direct:sftpGetCDRs"/>
        <from uri="sftp://{{HOST}}//home/admin/ROSS_PULL_CDR/resellers/${body.directoryLocation}/?noop=true&amp;streamDownload=true&amp;username={{USER}}&amp;password={{PASSWD}}"/>
        <to uri="file://{{DB_DIR_LOC}}/temp/?fileName=${body.directoryLocation}/&amp;autoCreate=true"/>
 </route>

Во время запуска приложения я сталкиваюсь с исключением, ниже которого, как мне кажется, я не должен сталкиваться, поскольку запись from должна собирать файлы и хранить их в разделе to, однако раздел to, в который я помещаю файл, кажется например, вместо файлов, взятых с ftp-сервера, делается ссылка на тело, взятое на маршруте.

---------------------------------------------------------------------------------------------------------------------------------------
RouteId              ProcessorId          Processor                                                                        Elapsed (ms)
[_route1           ] [_route1           ] [quartz2://spring?cron=0+*+*+%3F+*+*                                           ] [       121]
[_route2           ] [_choice1          ] [when[simple{${body.domainName} != null and ${body.countryCode} !=
                                null] [         6]
[_route2           ] [_to4              ] [bean:pullCDRProcessorWithFTP?method=createTemporaryDirectoryWithDirLoc        ] [         1]
[_route2           ] [_log6             ] [log                                                                           ] [         1]
[_route2           ] [_to4              ] [direct:sftpGetCDRs                                                            ] [         3]
[route5            ] [to1               ] [file://C:/example/export/home/resellerid/wmcdr/cdr/temp/?fileName=${body.director] [         2]

Exchange
---------------------------------------------------------------------------------------------------------------------------------------
Exchange[
    Id                  ID-IGL70051-59483-1550660613598-0-7
    ExchangePattern     InOnly
    Headers             {breadcrumbId=ID-IGL70051-59483-1550660613598-0-1, calendar=null, CamelFileName=null, CamelRedelivered=false, CamelRedeliveryCounter=0, fireTime=Wed Feb 20 16:34:00 IST 2019, jobDetail=JobDetail 'Camel_camelContext-f6f63d02-4688-4b3f-b3ca-b206a61cbe9f.spring':  jobClass: 'org.apache.camel.component.quartz2.CamelJob concurrentExectionDisallowed: false persistJobDataAfterExecution: false isDurable: false requestsRecovers: false, jobInstance=org.apache.camel.component.quartz2.CamelJob@65314fe3, jobRunTime=-1, mergedJobDataMap=org.quartz.JobDataMap@f7fbba19, nextFireTime=Wed Feb 20 16:35:00 IST 2019, previousFireTime=null, refireCount=0, result=null, scheduledFireTime=Wed Feb 20 16:34:00 IST 2019, scheduler=org.quartz.impl.StdScheduler@59c9a1bf, trigger=Trigger 'Camel_camelContext-f6f63d02-4688-4b3f-b3ca-b206a61cbe9f.spring':  triggerClass: 'org.quartz.impl.triggers.CronTriggerImpl calendar: 'null' misfireInstruction: 1 nextFireTime: Wed Feb 20 16:35:00 IST 2019, triggerGroup=Camel_camelContext-f6f63d02-4688-4b3f-b3ca-b206a61cbe9f, triggerName=spring}
    BodyType            com.example.cdr.model.DirectoryLocationListing
    Body                com.example.cdr.model.DirectoryLocationListing@23a2ecf4
]

Stacktrace
---------------------------------------------------------------------------------------------------------------------------------------
org.apache.camel.component.file.GenericFileOperationFailedException: Cannot store file: C:\EXAMPLE\export\home\resellerid\wmcdr\cdr\temp\IN\healthyindia\apj\SERVICEID
    at org.apache.camel.component.file.FileOperations.storeFile(FileOperations.java:292)
    at org.apache.camel.component.file.GenericFileProducer.writeFile(GenericFileProducer.java:277)
    at org.apache.camel.component.file.GenericFileProducer.processExchange(GenericFileProducer.java:165)
    at org.apache.camel.component.file.GenericFileProducer.process(GenericFileProducer.java:79)
    at org.apache.camel.util.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:61)
    at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:129)
    at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77)
    at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:448)
    at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
    at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
    at org.apache.camel.component.direct.DirectProducer.process(DirectProducer.java:51)
    at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:129)
    at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77)
    at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:448)
    at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
    at org.apache.camel.processor.Pipeline.process(Pipeline.java:118)
    at org.apache.camel.processor.Pipeline.process(Pipeline.java:80)
    at org.apache.camel.processor.ChoiceProcessor.process(ChoiceProcessor.java:111)
    at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77)
    at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:448)
    at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
    at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:448)
    at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
    at org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:590)
    at org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:518)
    at org.apache.camel.processor.MulticastProcessor.process(MulticastProcessor.java:227)
    at org.apache.camel.processor.Splitter.process(Splitter.java:104)
    at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77)
    at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:448)
    at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
    at org.apache.camel.processor.Pipeline.process(Pipeline.java:118)
    at org.apache.camel.processor.Pipeline.process(Pipeline.java:80)
    at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77)
    at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:448)
    at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
    at org.apache.camel.processor.Pipeline.process(Pipeline.java:118)
    at org.apache.camel.processor.Pipeline.process(Pipeline.java:80)
    at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
    at org.apache.camel.component.direct.DirectProducer.process(DirectProducer.java:51)
    at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:129)
    at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77)
    at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
    at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:448)
    at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
    at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:109)
    at org.apache.camel.processor.MulticastProcessor.doProcessParallel(MulticastProcessor.java:736)
    at org.apache.camel.processor.MulticastProcessor.access$200(MulticastProcessor.java:83)
    at org.apache.camel.processor.MulticastProcessor$1.call(MulticastProcessor.java:304)
    at org.apache.camel.processor.MulticastProcessor$1.call(MulticastProcessor.java:289)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.camel.InvalidPayloadException: No body available of type: java.io.InputStream but has value: com.EXAMPLE.cdr.model.DirectoryLocationListing@5a519bb2 of type: com.EXAMPLE.cdr.model.DirectoryLocationListing on: Message: com.EXAMPLE.cdr.model.DirectoryLocationListing@5a519bb2. Caused by: No type converter available to convert from type: com.EXAMPLE.cdr.model.DirectoryLocationListing to the required type: java.io.InputStream with value com.EXAMPLE.cdr.model.DirectoryLocationListing@5a519bb2. Exchange[Message: com.EXAMPLE.cdr.model.DirectoryLocationListing@5a519bb2]. Caused by: [org.apache.camel.NoTypeConversionAvailableException - No type converter available to convert from type: com.EXAMPLE.cdr.model.DirectoryLocationListing to the required type: java.io.InputStream with value com.EXAMPLE.cdr.model.DirectoryLocationListing@5a519bb2]
    at org.apache.camel.impl.MessageSupport.getMandatoryBody(MessageSupport.java:101)
    at org.apache.camel.component.file.FileOperations.storeFile(FileOperations.java:273)
    ... 52 more
Caused by: org.apache.camel.NoTypeConversionAvailableException: No type converter available to convert from type: com.EXAMPLE.cdr.model.DirectoryLocationListing to the required type: java.io.InputStream with value com.EXAMPLE.cdr.model.DirectoryLocationListing@5a519bb2
    at org.apache.camel.impl.converter.BaseTypeConverterRegistry.mandatoryConvertTo(BaseTypeConverterRegistry.java:177)
    at org.apache.camel.impl.MessageSupport.getMandatoryBody(MessageSupport.java:99)

Пожалуйста, помогите, если у кого-то возникла такая же проблема. Таким образом, может потребоваться преобразование типов, но я не вижу, что это необходимо в данном случае.

**Обновлять **

После использования pollenrich мои маршруты теперь выглядят следующим образом:

 <route id="route5">
        <from uri="direct:sftpGetCDRs"/>
        <process ref="sftpGetDirLocation"/>
        <log message="property ${exchangeProperty.ftpGetDirectory}"/>
        <pollEnrich uri="sftp://{{HOST}}//home/admin/ROSS_PULL_CDR/resellers/${exchangeProperty.ftpGetDirectory}/?noop=true&amp;streamDownload=true&amp;username={{USER}}&amp;password={{PASSWD}}" timeout="0"/>
        <to uri="file://{{DB_DIR_LOC}}/temp/?fileName=${body.directoryLocation}/&amp;autoCreate=true"/>
        </route>

И процессор, о котором я говорю, это

@Component("sftpGetDirLocation")
public class SFTPGetDirLocation implements Processor{

    public void process(Exchange exchange) throws Exception {
        exchange.setProperty("ftpGetDirectory", exchange.getIn().getBody(DirectoryLocationListing.class).getDirectoryLocation());
    }

}

Однако я предполагаю, что после этого изменения pollEnrich сохранит все (предположительно файлы), полученные от URI для sftp, и сохранит его как тело. Вместо этого я вижу, что тело пустое

Message History
---------------------------------------------------------------------------------------------------------------------------------------
RouteId              ProcessorId          Processor                                                                        Elapsed (ms)
[_route1           ] [_route1           ] [quartz2://spring?cron=0+*+*+%3F+*+*                                           ] [       657]
[_route2           ] [_choice1          ] [when[simple{${body.domainName} != null and ${body.countryCode} !=
                                null] [       572]
[_route2           ] [_to4              ] [bean:pullCDRProcessorWithFTP?method=createTemporaryDirectoryWithDirLoc        ] [         2]
[_route2           ] [_log6             ] [log                                                                           ] [         7]
[_route2           ] [_to4              ] [direct:sftpGetCDRs                                                            ] [       523]
[route5            ] [process1          ] [ref:sftpGetDirLocation                                                        ] [         0]
[route5            ] [log1              ] [log                                                                           ] [         1]
[route5            ] [pollEnrich1       ] [pollEnrich[sftp://10.91.142.11//home/admin/ROSS_PULL_CDR/resellers/${exchangeP] [       514]
[route5            ] [log2              ] [log                                                                           ] [         0]
[route5            ] [to1               ] [file://C:/EXAMPLE/export/home/resellerid/wmcdr/cdr/temp/?fileName=${body.director] [         0]

Exchange
---------------------------------------------------------------------------------------------------------------------------------------
Exchange[
    Id                  ID-IGL70051-57920-1550721464331-0-4
    ExchangePattern     InOnly
    Headers             {breadcrumbId=ID-IGL70051-57920-1550721464331-0-1, calendar=null, CamelFileName=null, CamelRedelivered=false, CamelRedeliveryCounter=0, CamelToEndpoint=sftp://10.91.142.11//home/admin/ROSS_PULL_CDR/resellers/$%7BexchangeProperty.ftpGetDirectory%7D/?noop=true&password=tcpip123&streamDownload=true&username=admin, fireTime=Thu Feb 21 09:28:00 IST 2019, jobDetail=JobDetail 'Camel_camelContext-f6f63d02-4688-4b3f-b3ca-b206a61cbe9f.spring':  jobClass: 'org.apache.camel.component.quartz2.CamelJob concurrentExectionDisallowed: false persistJobDataAfterExecution: false isDurable: false requestsRecovers: false, jobInstance=org.apache.camel.component.quartz2.CamelJob@363935fd, jobRunTime=-1, mergedJobDataMap=org.quartz.JobDataMap@f7fbba19, nextFireTime=Thu Feb 21 09:29:00 IST 2019, previousFireTime=null, refireCount=0, result=null, scheduledFireTime=Thu Feb 21 09:28:00 IST 2019, scheduler=org.quartz.impl.StdScheduler@5bf7fd08, trigger=Trigger 'Camel_camelContext-f6f63d02-4688-4b3f-b3ca-b206a61cbe9f.spring':  triggerClass: 'org.quartz.impl.triggers.CronTriggerImpl calendar: 'null' misfireInstruction: 1 nextFireTime: Thu Feb 21 09:29:00 IST 2019, triggerGroup=Camel_camelContext-f6f63d02-4688-4b3f-b3ca-b206a61cbe9f, triggerName=spring}
    BodyType            null
    Body                [Body is null]
]

Таким образом, исключение составляет

Stacktrace
---------------------------------------------------------------------------------------------------------------------------------------
org.apache.camel.component.file.GenericFileOperationFailedException: Cannot write null body to file: C:\EXAMPLE\export\home\resellerid\wmcdr\cdr\temp
    at org.apache.camel.component.file.FileOperations.storeFile(FileOperations.java:205)

person Parul Chauhan    schedule 20.02.2019    source источник


Ответы (2)


На мой взгляд, в коде есть три проблемы.

  1. Вы отправляете обмен на route5 с телом типа DirectoryLocationListing и ожидаете, что компонент sftp будет использовать это сообщение со вторым from определением. Это не сработало. Когда компонент SFTP загружает файлы, тело обмена будет заменено реальным файлом (типа org.apache.camel.component.file.GenericFile), и вы потеряете объект DirectoryLocationListing, который был у вас в предыдущем обмене. Content Enriher EIP - хороший вариант, если ваша камера pojo установлена ​​в другом месте, кроме тело.

  2. В текущей ситуации в конце маршрута вы ожидаете файл от конечной точки sftp, но на самом деле вам доставляется обмен с телом типа DirectoryLocationListing (ваш компонент SFTP не работал). Верблюд не может сделать это волшебство и сохранить его на диске. Именно на это он жалуется.

    2.1. Как только вы исправите эту проблему (и ваш компонент SFTP начнет выполнять свою работу), вы столкнетесь со следующей ситуацией, когда сообщение, доставляемое на вашу to конечную точку, теперь является файлом, вам нужно как-то сохранить directoryLocation в обмене до конца. маршрута.

  3. Параметр fileName в вашем определении to указывает на каталог. Это не сработает. Вам нужно будет использовать определение динамической конечной точки

Я не знаком с XML DSL Camel. Итак, я покажу, как это можно сделать с помощью Java DSL. Я буду избегать SFTP для пивоварения и вместо этого буду использовать file компонент. Мы постараемся исправить все эти проблемы сразу.

Определение маршрута

from("direct:test")
            .routeId("route5")
            .log("=> body.directoryLocation is: ${body.directoryLocation}") //Just to see if we can read the directoryLocation property from the POJO
            .process(exchange -> exchange.setProperty("WriteTargetDirectory",exchange.getIn().getBody(DirectoryListing.class).directoryLocation)) //Move directoryLocation property to an exchange property  named WriteTargetDirectory
            .pollEnrich()
            .simple("file://source/${exchangeProperty.WriteTargetDirectory}/?noop=true")//Poll enrich magic here!
            .log("=> I still have the target directory location as : ${exchangeProperty.WriteTargetDirectory}")
            .log("=> I just received file [${in.header." + Exchange.FILE_NAME + "}] and will write it as [${exchangeProperty.WriteTargetDirectory}/${in.header." + Exchange.FILE_NAME + "}]" )
            .toD("file://destination/${exchangeProperty.WriteTargetDirectory}/");

DirectoryListing класс

public class DirectoryListing {
        String directoryLocation;
        public DirectoryListing(String directoryLocation) {
            this.directoryLocation = directoryLocation;
        }
        public String getDirectoryLocation() {
            return directoryLocation;
        }
}

Код проверки маршрута

template.sendBody("direct:test",new DirectoryListing("IN/healthyindia/"));

Журнал с теста

[                          main] DefaultCamelContext            INFO  Apache Camel 2.23.1 (CamelContext: camel-1) started in 0.203 seconds
[                          main] route5                         INFO  => body.directoryLocation is: IN/healthyindia/
[                          main] FileEndpoint                   INFO  Endpoint is configured with noop=true so forcing endpoint to be idempotent as well
[                          main] FileEndpoint                   INFO  Using default memory based idempotent repository with cache max size: 1000
[                          main] route5                         INFO  => I still have the target directory location as : IN/healthyindia/
[                          main] route5                         INFO  => I just received file [test.txt] and will write it as [destination/IN/healthyindia//test.txt]
[                          main] MainRouteBuilderTest           INFO  ********************************************************************************
[                          main] MainRouteBuilderTest           INFO  Testing done: testRouteBuilder(JustTest.MainRouteBuilderTest)
[                          main] MainRouteBuilderTest           INFO  Took: 0.125 seconds (125 millis)
person ShellDragon    schedule 20.02.2019
comment
Только что понял, что WriteTargetDirectory - плохая репутация. Пожалуйста, используйте что-нибудь значимое - person ShellDragon; 20.02.2019
comment
Спасибо за предложения и корректирующие меры, предоставленные ShellDragon. Я попытался обновить XML DSL контекста верблюда, как тот, который был обновлен в вопросе. Теперь я смотрю на тело с нулевой ошибкой. Значит, содержимое моего тела должно быть нулевым, потому что SFTP URI не работает? или я что-то не так делаю? - person Parul Chauhan; 21.02.2019
comment
Вы можете разрешить запись нулевых тел, установив allowNullBody=true в последнем to, чтобы увидеть, что происходит. У вас случайно есть файлы нулевой длины в исходном каталоге? - person ShellDragon; 21.02.2019
comment
Теперь проблема устранена с помощью параметра consumerDelay = 60000. Похоже, что удаленный сервер не отвечал быстро, и мои свойства также не давали ему достаточно времени для ответа. - person Parul Chauhan; 25.02.2019
comment
Идеально! Рад узнать, что у вас наконец-то получилось. - person ShellDragon; 25.02.2019

В конце концов, у меня сработал следующий контекстный маршрут верблюда под руководством шеллдракона:

 <route id="route5">
            <from id="_from5" uri="direct:sftpGetCDRs"/>
            <process id="_process1" ref="sftpGetDirLocation"/>
            <log id="_log17" message="property ${exchangeProperty.ftpGetDirectory}"/>
            <pollEnrich id="_pollEnrich1" timeout="0" uri="sftp://{{HOST}}//home/admin/ROSS_PULL_CDR/resellers/temp/${body.directoryLocation}/?consumer.delay=60000&amp;username={{USER}}&amp;password={{PASSWD}}"/>
            <log id="_log18" message="${body}"/>
            <to id="_to2" uri="file://{{DB_DIR_LOC}}/temp/?fileName=${exchangeProperty.ftpGetDirectory}&amp;autoCreate=true"/>
        </route>
person Parul Chauhan    schedule 25.02.2019