Можно ли использовать Riak CS с Apache Flink?

Я хочу настроить filesystem серверную часть состояния и zookeeper режим восстановления:

state.backend: filesystem
state.backend.fs.checkpointdir: ???

recovery.mode: zookeeper
recovery.zookeeper.storageDir: ???

Как видите, я должен указать параметры checkpointdir и storageDir, но у меня нет файловых систем, поддерживаемых Apache Flink (например, HDFS или Amazon S3). Но я установил кластер Riak CS (кажется, он совместим с S3).

Итак, могу ли я использовать Riak CS вместе с Apache Flink? Если возможно: как настроить Apache Flink для работы с Riak CS?


person Maxim    schedule 06.04.2016    source источник


Ответы (1)


Ответ: как присоединиться к Apache Flink и Riak CS?

Riak CS имеет интерфейс, совместимый с S3 (версия 2). Итак, для работы с Riak CS можно использовать адаптер файловой системы S3 от Hadoop.

Я не знаю почему, но Apache Flink имеет только часть адаптеров файловой системы Hadoop внутри толстой банки (lib/flink-dist_2.11-1.0.1.jar), то есть имеет файловую систему FTP (org.apache.hadoop.fs.ftp.FTPFileSystem), но не имеет файловой системы S3 (т.е. org.apache.hadoop.fs.s3a.S3AFileSystem). Итак, у вас есть 2 способа решить эту проблему:

  • используйте эти адаптеры из установки Hadoop. Я не пробовал это делать, но, похоже, вам нужно просто настроить переменную evn HADOOP_CLASSPATH или HADOOP_HOME.
  • monky исправляет Apache Flink и загружает необходимые JAR-файлы в <flink home>/lib каталог

Итак, я выбрал второй способ, потому что не хочу предоставлять Hadoop в своей среде. Вы можете скопировать JAR из Hadoop dist или из Интернета:

curl http://central.maven.org/maven2/org/apache/hadoop/hadoop-aws/2.7.2/hadoop-aws-2.7.2.jar -o /flink/lib/hadoop-aws-2.7.2.jar
curl http://central.maven.org/maven2/com/amazonaws/aws-java-sdk/1.7.4/aws-java-sdk-1.7.4.jar -o /flink/lib/aws-java-sdk-1.7.4.jar
curl http://central.maven.org/maven2/org/apache/httpcomponents/httpcore/4.2.5/httpcore-4.2.5.jar -o /flink/lib/httpcore-4.2.5.jar
curl http://central.maven.org/maven2/org/apache/httpcomponents/httpclient/4.2.5/httpclient-4.2.5.jar -o /flink/lib/httpclient-4.2.5.jar

Как видите, я использую старые версии, потому что такая версия используется в Hadoop 2.7.2, и я использую Flink, совместимый с этой версией Hadoop.

К вашему сведению: такой взлом может вызвать проблемы, если вы используете последнюю версию этих JAR в собственном потоке. Чтобы избежать проблем, связанных с разными версиями, вы можете перемещать пакеты при создании толстой банки с потоком, используя что-то вроде (я использую Gradle):

// Relocate org.apache.http packages because Apache Flink include old version of this library (we place them for using S3 compatible FS)
shadowJar {
    dependencies {
        include(dependency('.*:.*:.*'))
    }

    relocate 'org.apache.http', 'relocated.org.apache.http'
    relocate 'org.apache.commons', 'relocated.org.apache.commons'
}

Затем вы должны указать путь к core-site.xml в flink-conf.yaml, потому что файловые системы, совместимые с Hadoop, используют эту конфигурацию для загрузки настроек:

...
fs.hdfs.hadoopconf: /flink/conf
...

Как видите, я просто помещаю его в каталог <fink home>/conf. Имеет следующие настройки:

<?xml version="1.0" encoding="UTF-8" ?>
<configuration>
    <property>
        <name>fs.s3a.impl</name>
        <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value> // because S3A better then other: https://wiki.apache.org/hadoop/AmazonS3
    </property>
    <property>
        <name>fs.s3a.endpoint</name>
        <value>my-riak-cs.stage.local</value>  // this is my Riak CS host
    </property>
    <property>
        <name>fs.s3a.connection.ssl.enabled</name> // my Riak CS in staging doesn't support SSL
        <value>false</value>
    </property>
    <property>
        <name>fs.s3a.access.key</name>
        <value>????</value> // this is my access key for Riak CS
    </property>
    <property>
        <name>fs.s3a.secret.key</name>
        <value>????</value> // this is my secret key for Riak CS
    </property>
</configuration>

Затем вы должны настроить корзины Riak CS в flink-conf.yaml как рекомендатель здесь:

...
state.backend.fs.checkpointdir: s3a://example-staging-flink/checkpoints
...
recovery.zookeeper.storageDir: s3a://example-staging-flink/recovery
...

и создавать корзины в Riak CS. Я использую s3cmd (установлен поверх brew в моем окружении для разработчиков OS X):

s3cmd mb s3://example-staging-flink

К вашему сведению: перед использованием s3cmd вы должны настроить его на использование s3cmd --configure, а затем исправить некоторые настройки в файле ~/.s3cmd:

signature_v2 = True // because Riak CS using S3 V2 interface
use_https = False // if your don't use SSL
access_key = ???
secret_key = ???
host_base = my-riak-cs.stage.local // your Riak CS host
host_bucket = %(bucket).my-riak-cs.stage.local // format of bucket used by Riak CS

Итак, это все, что вам нужно настроить для сохранения / восстановления состояния автономного кластера HA Apache Flink в Riak CS.

person Maxim    schedule 15.04.2016