Ответ: как присоединиться к Apache Flink и Riak CS?
Riak CS имеет интерфейс, совместимый с S3 (версия 2). Итак, для работы с Riak CS можно использовать адаптер файловой системы S3 от Hadoop.
Я не знаю почему, но Apache Flink имеет только часть адаптеров файловой системы Hadoop внутри толстой банки (lib/flink-dist_2.11-1.0.1.jar
), то есть имеет файловую систему FTP (org.apache.hadoop.fs.ftp.FTPFileSystem
), но не имеет файловой системы S3 (т.е. org.apache.hadoop.fs.s3a.S3AFileSystem
). Итак, у вас есть 2 способа решить эту проблему:
- используйте эти адаптеры из установки Hadoop. Я не пробовал это делать, но, похоже, вам нужно просто настроить переменную evn HADOOP_CLASSPATH или HADOOP_HOME.
- monky исправляет Apache Flink и загружает необходимые JAR-файлы в
<flink home>/lib
каталог
Итак, я выбрал второй способ, потому что не хочу предоставлять Hadoop в своей среде. Вы можете скопировать JAR из Hadoop dist или из Интернета:
curl http://central.maven.org/maven2/org/apache/hadoop/hadoop-aws/2.7.2/hadoop-aws-2.7.2.jar -o /flink/lib/hadoop-aws-2.7.2.jar
curl http://central.maven.org/maven2/com/amazonaws/aws-java-sdk/1.7.4/aws-java-sdk-1.7.4.jar -o /flink/lib/aws-java-sdk-1.7.4.jar
curl http://central.maven.org/maven2/org/apache/httpcomponents/httpcore/4.2.5/httpcore-4.2.5.jar -o /flink/lib/httpcore-4.2.5.jar
curl http://central.maven.org/maven2/org/apache/httpcomponents/httpclient/4.2.5/httpclient-4.2.5.jar -o /flink/lib/httpclient-4.2.5.jar
Как видите, я использую старые версии, потому что такая версия используется в Hadoop 2.7.2, и я использую Flink, совместимый с этой версией Hadoop.
К вашему сведению: такой взлом может вызвать проблемы, если вы используете последнюю версию этих JAR в собственном потоке. Чтобы избежать проблем, связанных с разными версиями, вы можете перемещать пакеты при создании толстой банки с потоком, используя что-то вроде (я использую Gradle):
// Relocate org.apache.http packages because Apache Flink include old version of this library (we place them for using S3 compatible FS)
shadowJar {
dependencies {
include(dependency('.*:.*:.*'))
}
relocate 'org.apache.http', 'relocated.org.apache.http'
relocate 'org.apache.commons', 'relocated.org.apache.commons'
}
Затем вы должны указать путь к core-site.xml
в flink-conf.yaml
, потому что файловые системы, совместимые с Hadoop, используют эту конфигурацию для загрузки настроек:
...
fs.hdfs.hadoopconf: /flink/conf
...
Как видите, я просто помещаю его в каталог <fink home>/conf
. Имеет следующие настройки:
<?xml version="1.0" encoding="UTF-8" ?>
<configuration>
<property>
<name>fs.s3a.impl</name>
<value>org.apache.hadoop.fs.s3a.S3AFileSystem</value> // because S3A better then other: https://wiki.apache.org/hadoop/AmazonS3
</property>
<property>
<name>fs.s3a.endpoint</name>
<value>my-riak-cs.stage.local</value> // this is my Riak CS host
</property>
<property>
<name>fs.s3a.connection.ssl.enabled</name> // my Riak CS in staging doesn't support SSL
<value>false</value>
</property>
<property>
<name>fs.s3a.access.key</name>
<value>????</value> // this is my access key for Riak CS
</property>
<property>
<name>fs.s3a.secret.key</name>
<value>????</value> // this is my secret key for Riak CS
</property>
</configuration>
Затем вы должны настроить корзины Riak CS в flink-conf.yaml
как рекомендатель здесь:
...
state.backend.fs.checkpointdir: s3a://example-staging-flink/checkpoints
...
recovery.zookeeper.storageDir: s3a://example-staging-flink/recovery
...
и создавать корзины в Riak CS. Я использую s3cmd
(установлен поверх brew
в моем окружении для разработчиков OS X):
s3cmd mb s3://example-staging-flink
К вашему сведению: перед использованием s3cmd
вы должны настроить его на использование s3cmd --configure
, а затем исправить некоторые настройки в файле ~/.s3cmd
:
signature_v2 = True // because Riak CS using S3 V2 interface
use_https = False // if your don't use SSL
access_key = ???
secret_key = ???
host_base = my-riak-cs.stage.local // your Riak CS host
host_bucket = %(bucket).my-riak-cs.stage.local // format of bucket used by Riak CS
Итак, это все, что вам нужно настроить для сохранения / восстановления состояния автономного кластера HA Apache Flink в Riak CS.
person
Maxim
schedule
15.04.2016