FastAPI - это бэкэнд-фреймворк для Python, точно так же, как Flask или Django (больше Flask, чем Django). Эта структура упрощает создание и тестирование конечных точек, поскольку абстрагирует множество сложных частей REST API. У этого есть свои плюсы и минусы, в основном простота использования в обмен на возможность настройки. Весь код для бэкенда можно найти здесь. Весь код проекта можно найти здесь.

Что такое REST API?

API (интерфейс прикладного программирования) - это интерфейс, который определяет, сколько одна программа может взаимодействовать с другой программой, на основе правил и определений (источник). API REST (репрезентационная передача состояния) - это стиль или руководство для написания веб-сервисов. Другими словами, Rest API - это программное обеспечение, специально разработанное для общения и взаимодействия в сети (источник).

В этом проекте мы разделим наше приложение на две части: Backend и Frontend. Интерфейс - это то, как пользователь будет взаимодействовать с нашим приложением, кнопками, текстом, который видит пользователь, и т. Д. Это мы сделаем с помощью Svelte. Бэкэнд выполняет работу по обработке взаимодействия между пользователем и интерфейсом, он обрабатывает данные, и в нашем случае это тот, кто взаимодействует с нашей моделью рекомендаций. В качестве примера мы можем использовать торговый автомат, интерфейс - это кнопка, на которой вы выбираете свой продукт, слот для монет и экран, который сообщает вам, сколько денег вы добавили. Бэкэнд - это система, которая считает деньги, двигатели, которые дают вам ваш продукт, которая, среди прочего, рассчитывает ваши изменения.

Еще нам нужна база данных

Итак, чтобы сохранить наши данные и другие вещи, мы собираемся использовать MongoDB, потому что я хотел использовать механизм базы данных на основе документов. Для установки базы данных вы можете использовать следующий учебник, но это не обязательно, поскольку мы будем использовать образ Docker для упрощения работы. Вам нужно установить MongoDB только в том случае, если вы собираетесь тестировать систему без Docker. Не стесняйтесь изменять код этого руководства, чтобы использовать любой движок базы данных, который вам нравится.

Давайте сделаем REST API

Приступим к созданию виртуальной среды для нашего бэкэнда. Для этого мы будем использовать virtualenv (посмотрите этот учебник о том, что такое и как использовать виртуальные среды в Python). Вот зависимости для этого скрипта:

fastapi[all]
pymongo
pandas
requests
uvicorn

Вы можете сохранить их в файле с именем requirements.txt и использовать команду pip install -r requirements.txt или установить каждую зависимость отдельно.

Начнем с подключения нашего приложения к нашей базе данных MongoDB. Поскольку мы собираемся использовать Docker для объединения наших разных систем, при запуске сценария будут вещи, которые не будут работать, и это нормально. Для проверки есть закомментированные параметры, которые вы можете не комментировать и комментировать параметр с тем же именем.

from fastapi import FastAPI, BackgroundTasks, File, UploadFile
from fastapi.middleware.cors import CORSMiddleware
from pymongo import MongoClient
import pandas as pd
import json
import uvicorn
import uuid
from bson import json_util
from typing import List
import requests
import numpy as np
import os
import time
# MONGO_HOST = "localhost"
MONGO_HOST = os.getenv("MONGOHOST")
# TF_SERVING = "localhost"
TF_SERVING = os.getenv("TF_SERVING_HOST")
mongo_client = MongoClient(MONGO_HOST, 27017)
db = mongo_client["MovieRecommenderDB"]
movie_col = db["movies"]
user_col = db["user"]
status_col = db["status"]
ratings_col = db["ratings"]
movie_encoded_col = db["movieEncoding"]

Игнорируя импорт, мы просто подключаемся к нашему экземпляру MongoDB, выбирая нашу базу данных с именем MovieRecommenderDB и наши коллекции. Если вы раньше не создавали свою базу данных или коллекции, не волнуйтесь, MongoDB позаботится об этом.

Теперь приступим к созданию нашего приложения FastAPI.

app = FastAPI()
origins = [
    "*"
]
app.add_middleware(
    CORSMiddleware,
    allow_origins=origins,
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

В этой паре строк нам есть что распаковать. Во-первых, app - это экземпляр FastAPI, это минимум, необходимый для создания нашего API. Следующие строки предназначены для того, чтобы наш интерфейс мог взаимодействовать с нашим сервером. Чтобы узнать больше о CORS, вы можете прочитать здесь и / или здесь.

Предположим, что база данных будет обновляться с помощью нашего API, поэтому давайте создадим логику, чтобы сделать это доступным.

def get_number_of(collection):
    num_of_movies = collection.find({}).count()
    return num_of_movies
def save_to_db(csv_file, file_size, jobId, collection):
    _index = get_number_of(collection)
    csv_file.file.seek(0)
    for i, l in enumerate(csv_file.file):
        pass
    csv_file.file.seek(0)
    headers = csv_file.file.readline().decode().replace(";\\n", "").split(";")
    for j, l in enumerate(csv_file.file):
        line = l.decode()
        line = line.replace(";\\n", "")
        row_elem = line.split(";")
        if len(row_elem) > len(headers):
            job_doc = {"jobId": jobId,
                "status": "Error",
                "percentage": int((j/i)*100),
                "reason": f"Ilegal Character in line {j}"}
            status_col.update_one({"jobId": jobId}, {"$set": job_doc})
        else:
            doc = {}
            for e in range(len(row_elem)):
                doc[headers[e]] = row_elem[e]
            doc["index"] = _index + j
            if collection.find_one(doc) is None: 
                collection.insert_one(doc)
            else:
                pass
            status_col.update_one({"jobId": jobId}, {"$set": {"percentage": int((j/i)*100)}})
    status_col.update_one({"jobId": jobId}, {"$set": {"percentage": 100, "status": "complete", "fileName": csv_file.filename, "fileSize": file_size, "numOfRows": i}})
            
def make_movie_encoding():
    unique_movies = ratings_col.distinct("movieId")
    movie_encoder = {x: i for i, x in enumerate(unique_movies)}
    for key, item in movie_encoder.items():
        doc = {"movieId": key, "index": item}
        movie_encoded_col.insert_one(doc)
def save_ratings_to_db(csv_file, file_size, jobId):
    save_to_db(csv_file, file_size, jobId, ratings_col)
    make_movie_encoding()
@app.get("/status/{jobId}")
def get_status_bulk_update(jobId: str):
    job_doc = status_col.find_one({"jobId": jobId}, {'_id': False})
    job_doc = json.loads(json_util.dumps(job_doc))
    return job_doc
def insert_status(doc):
    status_col.insert_one(json.loads(json.dumps(doc)))
@app.post("/movies/bulk_update")
def bulk_update_movie_database(background_task: BackgroundTasks, csv_file: UploadFile = File(...)):
    jobId = str(uuid.uuid4())
    csv_file.file.seek(0, 2)
    file_size = csv_file.file.tell()/1000
    job_doc = {"jobId": jobId,
                "status": "inProgress",
                "percentage": 0}
    insert_status(job_doc)
    background_task.add_task(save_to_db, csv_file, file_size, jobId, movie_col)
    return {"filename": csv_file.filename,
            "file_size": file_size,
            "job": job_doc}
@app.post("/ratings/bulk_update")
def bulk_update_rating_database(background_task: BackgroundTasks, csv_file: UploadFile = File(...)):
    jobId = str(uuid.uuid4())
    csv_file.file.seek(0, 2)
    file_size = csv_file.file.tell()/1000
    job_doc = {"jobId": jobId,
                "status": "inProgress",
                "percentage": 0}
    insert_status(job_doc)
    background_task.add_task(save_ratings_to_db, csv_file, file_size, jobId)
    return {"filename": csv_file.filename,
            "file_size": file_size,
            "job": job_doc}

В этом коде мы начнем с функций bulk_update_rating_database, bulk_update_movie_database и get_status_bulk_update. Как видите, над всеми этими функциями вы можете найти декоратор python @app.get или @app.post. Этот декоратор позволяет нам создавать URL-пути или конечные точки. Конечные точки не могут быть одной из многих, но в большинстве случаев вы используете четыре GET, POST, UPDATE и DELETE. Для конвейера данных мы используем POST для обновления нашей базы данных новыми рейтингами или новым фильмом. Мы используем метод GET для проверки статуса задачи

Одна из замечательных особенностей FastAPI - это то, что называется BackgroundTasks. Вы можете указать FastAPI отправить некоторую работу, например, предварительную обработку данных, для выполнения в фоновом режиме и предоставить пользователю возможность проверить статус своей задачи.

Теперь давайте перейдем к вызову нашей модели, чтобы дать рекомендации.

def find_movies_by_ids(id_list):
    title_list = []
    for id in id_list:
        movie_title = movie_col.find_one({"movieId": str(id)})
        if movie_title is not None:
            title_list.append(movie_title["title"])
        else:
            pass
    return title_list
def get_recomendation(movie_ids):
    movie_array = np.hstack(([[0]]*len(movie_ids), movie_ids))
    body = {"instances": movie_array.tolist()}
    url = f"http://{TF_SERVING}:8501/v1/models/movie_model:predict"
    response = requests.request("POST", url, data=json.dumps(body))
    aux = response.json()
    return aux
def get_movie_index(movieIds):
    movie_indexs = []
    for movie in movieIds:
        movie_doc = movie_col.find_one({"movieId": str(movie)})
        if movie_doc is not None:
            movie_indexs.append([movie_doc["index"]])
        else:
            pass
    return movie_indexs
def encode_movieIds(movieIds):
    encoded_movies = []
    for movie in movieIds:
        doc = movie_encoded_col.find_one({"movieId": str(movie)})
        if doc is not None:
            encoded_movies.append([doc["index"]])
        else:
            pass
        
    return encoded_movies
def find_movies_not_watched(movieIndexs):
    indexs_to_watch = []
    movies_to_watch = movie_col.find({"$nor": [{"movieId": {"$in": movieIndexs}}]})
    indexs_to_watch = [int(x["movieId"]) for x in movies_to_watch]
    return indexs_to_watch
def clean_up_recommendations(recommendation_scores, top_indexes):
    recommendation_body = []
    for index in top_indexes:
        movieId = movie_encoded_col.find_one({"index": int(index)}, {'_id': False})["movieId"]
        movieDoc = movie_col.find_one({"movieId": movieId}, {'_id': False})
        movieScore = recommendation_scores[index]
        body = {
            "title": movieDoc["title"],
            "genre": movieDoc["genres"],
            "movieScore": movieScore[0]
        }
        recommendation_body.append(body)
    return recommendation_body
def generate_recommendations(movieIds, jobId):
    start = time.time()
    movieIds = [str(x) for x in movieIds]
    still_to_watch_indxs = find_movies_not_watched(movieIds)
    encoded_movies = encode_movieIds(still_to_watch_indxs)
    recommendation = get_recomendation(encoded_movies)
    recomender_scores = np.array(recommendation["predictions"]).flatten()
    top_recommendations_indx  = np.array(recomender_scores).argsort()[-10:][::-1]
    recommendations = clean_up_recommendations(recommendation["predictions"], top_recommendations_indx)
    inputMovieTitles = find_movies_by_ids(movieIds)
    end = time.time()
    timeTaken = end-start
    status_col.update_one({"jobId": jobId}, {"$set": {"status": "complete", "input": inputMovieTitles, "recommendation": recommendations, "timeTaken": timeTaken}})
@app.post("/movie/make_recom")
def make_recomendation(movies: List, background_task: BackgroundTasks):
    jobId = str(uuid.uuid4())
    job_doc = {"jobId": jobId,
                "status": "inProgress"
                }
    insert_status(job_doc)
    background_task.add_task(generate_recommendations, movies, jobId)
    
    return job_doc
@app.get("/autocomplete")
def get_autocomplete_movies():
    movie_all = movie_col.find()
    data = {}
    for doc in movie_all:
        data[doc["title"]] = doc["movieId"]
    return data
if __name__ == "__main__":
    uvicorn.run("main:app", host="127.0.0.1", port=8000, log_level="info")

Суть этих функций более или менее проста. Он начинается с получения списка идентификаторов фильмов, затем мы фильтруем все фильмы, которых нет в этом списке. Затем мы вызываем модель с помощью HTTP-запроса на наш сервер обслуживания Tensorflow. Мы получаем список вероятностей, что мы получим 10 показателей с наибольшей вероятностью. Наконец, мы преобразуем эти индексы обратно в фильмы и возвращаемся к пользователю, используя нашу конечную точку состояния.

Как это проверить?

Одна из замечательных особенностей FastAPI - автоматическая документация с использованием Swagger. Вы можете запустить этот сценарий на своем компьютере, зайти в веб-браузер по следующему адресу: http://localhost:8000/docs и вы должны увидеть что-то вроде этого:

Здесь вы можете попробовать свои конечные точки, некоторые из них могут не работать из-за внешних зависимостей, таких как MongoDB или Tensorflow Serving. Не волнуйтесь, потому что мы свяжем все вместе в части этого руководства, посвященной Docker.