Джордж Джен, Jen Tek LLC

В письме есть следующие компоненты:

· Апачская стрела

· Полет стрелы Апача

· Дремио сервер

· Полетный коннектор Dremio

· Машинное обучение Apache Spark.

Давайте перечислим все компоненты в письме:

Стрелка Apache:

Apache Arrow — это кросс-языковая платформа для разработки данных в памяти. Он определяет стандартизированный независимый от языка столбцовый формат памяти для плоских и иерархических данных, организованный для эффективных аналитических операций на современном оборудовании. Он также предоставляет вычислительные библиотеки и потоковую передачу сообщений с нулевым копированием и межпроцессное взаимодействие. В настоящее время поддерживаются следующие языки: C, C++, C#, Go, Java, JavaScript, MATLAB, Python, R, Ruby и Rust.

Apache Arrow имеет столбчатую структуру данных в памяти, что позволяет приложениям избегать ненужных операций ввода-вывода и повышать производительность аналитической обработки на современных ЦП и ГП.

Обычно это способ обмена данными между различными системами:

С очевидными недостатками:

· Каждая система имеет свой формат внутренней памяти

· 70–80% вычислений тратится на сериализацию и десериализацию

· Аналогичная функциональность реализована в нескольких проектах

С Apache Arrow в качестве общих структур данных:

С очевидными преимуществами, когда речь идет о передаче данных между различными системами, использующими инфраструктуру структуры данных памяти Arrow:

· Все системы используют один и тот же формат памяти

· Отсутствие накладных расходов на межсистемное взаимодействие

· Проекты могут иметь общие функции (например, считыватель Parquet-to-Arrow)

Apache Arrow Flight

Платформа для быстрой передачи данных

Первоначально Flight ориентирован на оптимизированную передачу столбцового формата Arrow (т. е. «пакетов записей Arrow») через gRPC, популярную универсальную RPC-библиотеку и инфраструктуру Google на основе HTTP/2. Хотя мы сосредоточились на интеграции с gRPC, в качестве среды разработки Flight не предназначен исключительно для gRPC.

Одной из самых важных особенностей, которая отличает Flight от других платформ передачи данных, является параллельная передача, позволяющая одновременно передавать данные в поток или из кластера серверов. Это позволяет разработчикам легче создавать масштабируемые службы данных, которые могут обслуживать растущую клиентскую базу.

Простая установка Flight может состоять из одного сервера, к которому подключаются клиенты и делают запросы DoGet.

Основы полета

Библиотеки Arrow Flight предоставляют среду разработки для реализации службы, которая может отправлять и получать потоки данных. Сервер Flight поддерживает несколько основных типов запросов:

· Рукопожатие: простой запрос для определения авторизации клиента и, в некоторых случаях, для установления определяемого реализацией токена сеанса для использования в будущих запросах.

· ListFlights: возвращает список доступных потоков данных

· GetSchema: вернуть схему для потока данных

· GetFlightInfo: возвращает «план доступа» к интересующему набору данных, возможно, требуя использования нескольких потоков данных. Этот запрос может принимать пользовательские сериализованные команды, содержащие, например, конкретные параметры вашего приложения.

· DoGet: отправить поток данных клиенту

· DoPut: получение потока данных от клиента

· DoAction: выполнить действие, зависящее от реализации, и вернуть любые результаты, т. е. вызов обобщенной функции.

· ListActions: возвращает список доступных типов действий

Также воспользуйтесь элегантной поддержкой «двунаправленной» потоковой передачи gRPC (построенной поверх потоковой передачи HTTP/2), чтобы клиенты и серверы могли одновременно отправлять данные и метаданные друг другу во время обслуживания запросов.

Дремио



Механизм запросов Dremio построен на Apache Arrow, который представляет собой столбчатую структуру данных в памяти. Его механизм SQL позволяет вам использовать SQL для запроса структурированных данных, таких как таблицы реляционной базы данных, или неструктурированных данных, таких как сущности пар ключ-значение, такие как JSON, это распределенный / кластеризованный механизм запросов в столбцах памяти, который может работать на одном узле. или много узлов.

Dremio Flight Connector

Dremio Flight Connector — это реализация Apache Arrow Flight Framework, которая позволяет клиенту, например программе Java или скрипту Python, запрашивать данные с сервера Dremio с использованием протокола Apache Arrow Flight, который наследует структуру данных Apache Arrow для передачи данных.

Это означает, что вы можете получать данные из Dremio или любых систем, использующих Apache Arrow в качестве кеша данных в памяти, без ODBC или JDBC и без накладных расходов на сериализацию и десериализацию, которые возникают в обоих случаях.

Ниже представлена ​​демонстрация интеграции Dremio Data Lake Engine Server, Dremio Flight Connector и Apache Machine Learning Detection Spam Detection.

Настройка:

В моем последнем письме я создал версию сервера Dremio с открытым исходным кодом. Я собираюсь использовать сервер Dremio, уже созданный для написания этой статьи.



Первая задача, мне нужно создать Dremio Flight Connector, который позволяет получать данные от Dremio через Apache Flight, который является протоколом передачи данных памяти Arrow.

(1). Для начала git клонируйте Dremio Flight Connector с GitHub:

https://github.com/dremio-hub/dremio-flight-connector

(2). Затем следуйте инструкциям по сборке в файле readme, убедитесь, что ваша версия mvn обновлена, и просто запустите:

чистая установка mvn

или если у вас нет текущей или последней версии mvn:

чистая установка mvnw

(3). После завершения сборки получите файл jar в целевой папке внутри папки, созданной git, например:

dremio-flight-connector-0.11.0-SNAPSHOT-shaded.jar

и скопируйте его в ‹домашнюю директорию сервера dremio›/jars/

(4). Затем измените ‹домашний каталог сервера dremio›/conf/dremio-env.

DREMIO_JAVA_SERVER_EXTRA_OPTS='-Ddremio.flight.enabled=true -Ddremio.flight.parallel.enabled=true -Ddremio.flight.use-ssl=true -Ddremio.flight.port=47470 -Ddremio.flight.host=0.0 0,0 дюйма

(5). Перезапустите dremio, предположим, что вы находитесь в ‹домашнем каталоге сервера dremio›.

bin/dremio — config ./conf start

(6). Чтобы убедиться, что Dremio Flight Connector запущен и работает:

(i) Проверьте процесс Java, чтобы убедиться, что «-Ddremio.flight.enabled=true» находится внутри командной строки dremio.

пс-эф | греп дремио | java | грэп -в грэп

dremio 5588 1 2 11:46 pts/1 00:05:12 /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.191.b12–1.el7_6.x86_64/jre/bin/java -Djava. util.logging.config.class=org.slf4j.bridge.SLF4JBridgeHandler -Djava.library.path=/opt/dremio/dremio-oss/distribution/server/target/dremio-community-4.1.8–202003120636020140–9c2a6b13/dremio -community-4.1.8–202003120636020140–9c2a6b13/lib -XX:+PrintGCDetails -XX:+PrintGCDateStamps -Xloggc:/opt/dremio/dremio-oss/distribution/server/target/dremio-community-4.1.8–202003120636020140– 9c2a6b13/dremio-community-4.1.8–202003120636020140–9c2a6b13/log/server.gc -Ddremio.log.path=/opt/dremio/dremio-oss/distribution/server/target/dremio-community-4.1.8–202003120636020140 –9c2a6b13/dremio-community-4.1.8–202003120636020140–9c2a6b13/log -Ddremio.plugins.path=/opt/dremio/dremio-oss/distribution/server/target/dremio-community-4.1.8–202003120636020140–13c2a6b dremio-community-4.1.8–202003120636020140–9c2a6b13/plugins -Xmx4096m -XX:MaxDirectMemorySize=8192m -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/opt/dremio/dremio-oss/distribution/server/target/dremio-community-4.1.8–202003120636020140–9c2a6b13/dremio-community-4.1.8–202003120636020140–9c2a6b13/log-Dio.netty. maxDirectMemory=0 -DMAPR_IMPALA_RA_THROTTLE -DMAPR_MAX_RA_STREAMS=400 -Ddremio.flight.enabled=true -Ddremio.flight.parallel.enabled=true -Ddremio.flight.use-ssl= true -Ddremio.flight.port=47470 -Ddremio.flight.host=0.0.0.0 … com.dremio.dac.daemon.DremioDaemon

(ii) Проверьте файл dremio server.log, ‹домашняя директория dremio›/log/server.log, найдите следующее:

2020–03–21 17:16:39,835 [main] ИНФОРМАЦИЯ com.dremio.flight.FlightInitializer — настройте плагин полета на порт 47470 и хост 0.0.0.0

(iii) Наконец, чтобы убедиться, что порт 47470 (полетный коннектор) прослушивается Dremio, с помощью telnet:

телнет 10.0.2.15 47470

Попытка 10.0.2.15…

Подключен к 10.0.2.15.

Экранирующий символ «^]».

Apache Spark в качестве клиента сервера Dremio:

Когда все будет готово, вы приступите к работе, чтобы продолжить свою работу по науке о данных.

Набор данных, используемый в этом письме, — это SMSSpamCollection, данные для текстовых сообщений, в которых указано, является ли текст SMS спамом (спамом) или ветчиной (не спамом).

Ниже приводится описание от поставщика этого набора данных:

Коллекция SMS-спама версии 1 (далее — корпус) — это набор помеченных SMS-сообщений, которые были собраны для исследования SMS-спама. Он содержит один набор SMS-сообщений на английском языке из 5 574 сообщений, помеченных в зависимости от того, являются ли они нежелательными (законными) или спамом.

Сначала я поместил файл данных SMSSpamCollection в папку HDFS /dremio.

hdfs dfs -ls /дремио/

Найдено 3 шт.

-rw-r — r — 3 супергруппа hadoop 477907 2020–03–21 11:07 /dremio/SMSSpamCollection

Файл разделен табуляцией (\t).

В пользовательском интерфейсе dremio загрузите и каталогизируйте этот файл как источник данных, чтобы этот файл можно было запрашивать с помощью SQL, как если бы это была таблица.

Как только файл набора данных может быть загружен Dremio, который теперь может предоставлять клиенту, например, скрипт Python с Dremio Flight Connector, без ODBC, следующий код Python делает следующее:

· Импорт pyarrow и полетной библиотеки

· Подключение к Dremio через коннектор Arrow Flight, включая аутентификацию

· Извлечение данных из файла SMSSpamCollection SQL и загрузка в кадр данных Pandas

· Запускает Apache Spark и преобразует фрейм данных Pandas в фрейм данных SparkSQL.

· Проводить обучение и тестирование моделей машинного обучения с помощью Apache Spark ML LogisticRegression и NeiveBayes.

Ниже приведен код Python, который подключается к Dremio с помощью Dremio Flight Connector и выполняет машинное обучение с помощью Apache Spark:

#Импорт необходимой библиотеки Apache Arrow Python для полета

из импортного рейса pyarrow

импортировать pyarrow как pa

#Определить методы аутентификации клиента Dremio

класс HttpDremioClientAuthHandler(flight.ClientAuthHandler):

def __init__(я, имя пользователя, пароль):

super(flight.ClientAuthHandler, self).__init__()

self.basic_auth = Flight.BasicAuth(имя пользователя, пароль)

self.token = Нет

def authentication(self, outgoing, incoming):

auth = self.basic_auth.serialize()

исходящий.write(auth)

self.token = incoming.read()

деф get_token(self):

возвратить self.token

имя пользователя = "Джордж"

пароль = ‘‹отредактировано›’

sql = ‘’’выбрать * из «fraud.dremio».SMSSpamCollection’’’

#Подключитесь к Dremio с помощью разъема Flight Connector на порту 47470, упомянутом ранее в письме.

client = Flight.FlightClient('grpc+tcp://10.0.2.15:47470')

client.authenticate(HttpDremioClientAuthHandler(имя пользователя, пароль))

#передача оператора SQL-запроса в Dremio, выполнение и возврат данных в pandas

#данные pdf

info = client.get_flight_info(flight.FlightDescriptor.for_command(sql))

reader = client.do_get(info.endpoints[0].ticket)

партии = []

импортировать панд как pd

пока верно:

попробовать:

пакет, метаданные = reader.read_chunk()

batches.append(пакет)

кроме StopIteration:

перерыв

data = pa.Table.from_batches(batches)

pdf = data.to_pandas()

pdf.head()

A B

0 ham Иди до точки Джуронг, сумасшедший.. Доступно только …

1 ham Ok lar… Шучу с тобой…

2 spam Бесплатный вход в 2 еженедельных турнира за победу в финале Кубка Англии…

3 ham U dun скажи так рано hor… U c уже тогда скажи…

4 ham Нет, я не думаю, что он ходит в США, он живет около…

импортировать sys,os,os.path

os.environ[‘SPARK_HOME’]=’/opt/spark/’

импортировать matplotlib.pyplot как plt

%matplotlib встроенный

импортировать поиск

findspark.init()

импортировать pyspark

из pyspark.sql импортировать SparkSession

из pyspark.sql импортировать SQLContext

spark = SparkSession.builder.getOrCreate()

sc=spark.sparkContext

#Преобразовать кадр данных pandas в кадр данных SparkSQL

sqlCtx = SQLContext(sc)

df = sqlCtx.createDataFrame(pdf)

df.show(5)

+ — — + — — — — — — — — — — +

| A| B|

+ — — + — — — — — — — — — — +

| ветчина|Идти до juong p…|

| ветчина|Хорошо, лар… Шучу …|

|спам|Бесплатный вход через 2…|

| ветчина|Не скажешь, эрл…|

| ветчина|Нет, я не думаю…|

+ — — + — — — — — — — — — — +

показаны только верхние 5 строк

#Примечание: спам есть спам, с Ханом все в порядке. Переименовать имя столбца A как статус, B как функцию

df = df.withColumnRenamed('A', 'статус').withColumnRenamed('B', 'сообщение')

# Кодировать столбец состояния в числовое значение: ветчину в 1,0 и спам в 0.

# Все наши поля должны быть числовыми для машинного обучения, также переименуйте статус столбца в метку

df.createOrReplaceTempView(‘temp’)

df = spark.sql('выберите статус обращения, когда "ham", затем 1.0, иначе 0 заканчивается как метка, сообщение от temp')

df.show()

+-----+--------------------+
|label|             message|
+-----+--------------------+
|  1.0|Go until jurong p...|
|  1.0|Ok lar... Joking ...|
|  0.0|Free entry in 2 a...|
|  1.0|U dun say so earl...|
|  1.0|Nah I don't think...|
|  0.0|FreeMsg Hey there...|
|  1.0|Even my brother i...|
|  1.0|As per your reque...|
|  0.0|WINNER!! As a val...|
|  0.0|Had your mobile 1...|
|  1.0|I'm gonna be home...|
|  0.0|SIX chances to wi...|
|  0.0|URGENT! You have ...|
|  1.0|I've been searchi...|
|  1.0|I HAVE A DATE ON ...|
|  0.0|XXXMobileMovieClu...|
|  1.0|Oh k...i'm watchi...|
|  1.0|Eh u remember how...|
|  1.0|Fine if that’s th...|
|  0.0|England v Macedon...|
+-----+--------------------+
only showing top 20 rows
#1 is OK, 0 is Spam
#Tokenize the messages Tokenization is the process of taking text (such as a sentence)
# and breaking it into individual terms (usually words). Let’s tokenize the messages
#and create a list of words of each message.
from pyspark.ml.feature import  Tokenizer
tokenizer = Tokenizer(inputCol="message", outputCol="words")
wordsData = tokenizer.transform(df)
wordsData.show(3)
+-----+--------------------+--------------------+
|label|             message|               words|
+-----+--------------------+--------------------+
|  1.0|Go until jurong p...|[go, until, juron...|
|  1.0|Ok lar... Joking ...|[ok, lar..., joki...|
|  0.0|Free entry in 2 a...|[free, entry, in,...|
+-----+--------------------+--------------------+
only showing top 3 rows
#CountVectorizer converts a collection of text documents to vectors of token counts.
from pyspark.ml.feature import CountVectorizer
count = CountVectorizer (inputCol="words", outputCol="rawFeatures")
model = count.fit(wordsData)
featurizedData = model.transform(wordsData)
featurizedData.show(3)
+-----+--------------------+--------------------+--------------------+
|label|             message|               words|         rawFeatures|
+-----+--------------------+--------------------+--------------------+
|  1.0|Go until jurong p...|[go, until, juron...|(13587,[8,43,53,6...|
|  1.0|Ok lar... Joking ...|[ok, lar..., joki...|(13587,[5,76,409,...|
|  0.0|Free entry in 2 a...|[free, entry, in,...|(13587,[0,3,8,22,...|
+-----+--------------------+--------------------+--------------------+
only showing top 3 rows
#IDF reduces the features that often appear in the corpus. When using text as a feature,
#this usually improves performance because the most common,
#and therefore less important, words are weighted down

из pyspark.ml.feature импортировать IDF

idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)
rescaledData.select("label", "features").show(3)  #Only needed to train
+-----+--------------------+
|label|            features|
+-----+--------------------+
|  1.0|(13587,[8,43,53,6...|
|  1.0|(13587,[5,76,409,...|
|  0.0|(13587,[0,3,8,22,...|
+-----+--------------------+
only showing top 3 rows
#Randomly Split DataFrame into 80% Training (trainDF) and 20 Testing (testDF)
seed = 0  # random seed 0
trainDF, testDF = rescaledData.randomSplit([0.8,0.2],seed)
#Logistic regression classifier
#Logistic regression is a common method of predicting classification responses.
#A special case of a generalized linear model is the probability of predicting a result.
#In spark.ml, logistic regression can be used to predict binary results
#by binomial logistic regression, or it can be used to predict multiple types of results by using multiple logistic regression.
#Use the family parameter to choose between these two algorithms, or leave it unset and Spark
#will infer the correct variable.
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
import numpy as np
lr = LogisticRegression(maxIter = 100)
model_lr = lr.fit(trainDF)
prediction_lr = model_lr.transform(testDF)
from pyspark.ml.evaluation import BinaryClassificationEvaluator
my_eval_lr = BinaryClassificationEvaluator(rawPredictionCol='prediction', labelCol='label', metricName='areaUnderROC')
my_eval_lr.evaluate(prediction_lr)
0.8734030197444833
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
my_mc_lr = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='label', metricName='f1')
my_mc_lr.evaluate(prediction_lr)
0.9654997463216642
my_mc_lr = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='label', metricName='accuracy')
my_mc_lr.evaluate(prediction_lr)
0.967479674796748
train_fit_lr = prediction_lr.select('label','prediction')
train_fit_lr.groupBy('label','prediction').count().show()
+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|  0.0|       1.0|   31|
|  1.0|       1.0|  860|
|  0.0|       0.0|   92|
|  1.0|       0.0|    1|
+-----+----------+-----+
#Naive Bayes Naive Bayesian classifiers are a class of simple probability classifiers
#that apply strong (naive)
#independent assumptions between features based on Bayes' theorem.
#The spark.ml implementation
#currently supports polynomial naive Bayes and Bernoulli Naïve Bayes.
from pyspark.ml.classification import NaiveBayes
nb = NaiveBayes()
Model_nb = nb.fit(trainDF)
predictions_nb = Model_nb.transform(testDF)

predictions_nb.select(‘label’, ‘prediction’).show(5)

+-----+----------+
|label|prediction|
+-----+----------+
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
+-----+----------+
only showing top 5 rows
from pyspark.ml.evaluation import BinaryClassificationEvaluator
my_eval_nb = BinaryClassificationEvaluator(rawPredictionCol='prediction', labelCol='label', metricName='areaUnderROC')
my_eval_nb.evaluate(predictions_nb)
0.937862950058072
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
my_mc_nb = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='label', metricName='f1')
my_mc_nb.evaluate(predictions_nb)
0.933544453535483
my_mc_nb = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='label', metricName='accuracy')
my_mc_nb.evaluate(predictions_nb)
0.9278455284552846

Сводка:

Ниже приведены показатели модели:

ML модель

AUC

F1

Точность

Логистическая регрессия

0.8734030197444833
0.9654997463216642
0.967479674796748

Наивный Байес

0.937862950058072
0.933544453535483
0.9278455284552846

В этом упражнении логистическая регрессия похожа на NaiveBayes. Цель этого письма — продемонстрировать, как использовать Dremio Flight Connector для получения данных с сервера Dremio с протоколом Apache Arrow Flight без ODBC или JDBC для запуска приложений с Apache Spark, таких как машинное обучение с Spark.

Как всегда, код, используемый в этом письме, находится в моем репозитории GitHub.



Спасибо, что уделили время просмотру этого письма.