Раковина Flink Jdbc

Я создал приложение, в котором я читаю данные из потоков Kinesis и сохраняю их в таблице mysql.

Я попробовал загрузить приложение. На 100к работ требуется более 3 часов. Любое предположение, почему это происходит так медленно. Еще одна вещь: первичный ключ моей таблицы состоит из 7 столбцов.

Я использую спящий режим для хранения POJO непосредственно в базе данных.

код:

public class JDBCSink extends RichSinkFunction<CompetitorConfig> {
    private static SessionFactory sessionFactory;
    private static StandardServiceRegistryBuilder serviceRegistry;
    private Session session;
    private  String username;
    private  String password;
    private static final Logger LOG = LoggerFactory.getLogger(CompetitorConfigJDBCSink.class);

    public JDBCSink(String user, String pass) {
        username = user;
        password = pass;
    }

    public static void configureHibernateUtil(String username, String password) {
        try {

            Properties prop= new Properties();
            prop.setProperty("hibernate.dialect", "org.hibernate.dialect.MySQLDialect");
            prop.setProperty("hibernate.connection.driver_class", "com.mysql.cj.jdbc.Driver");
            prop.setProperty("hibernate.connection.url", "url");
            prop.setProperty("hibernate.connection.username", username);
            prop.setProperty("hibernate.connection.password", password);
            org.hibernate.cfg.Configuration configuration = new org.hibernate.cfg.Configuration().addProperties(prop);
            configuration.addAnnotatedClass(CompetitorConfig.class);
            serviceRegistry = new StandardServiceRegistryBuilder().applySettings(configuration.getProperties());
            sessionFactory = configuration.buildSessionFactory(serviceRegistry.build());
        } catch (Throwable ex) {
             throw new ExceptionInInitializerError(ex);
        }
    }

    @Override
    public void open(Configuration parameters) throws Exception {

        configureHibernateUtil(username,password);
        this.session = sessionFactory.openSession();
    }

    @Override
    public void invoke(CompetitorConfig value) throws Exception {
        Transaction transaction = null;
        try {
            transaction = session.beginTransaction();
            session.merge(value);
            session.flush();

        } catch (Exception e) {
             throw e;
        } finally {
            transaction.commit();
        }
    }

    @Override
    public void close() throws Exception {
           this.session.close();
            sessionFactory.close();
    }
}


person user7665394    schedule 05.03.2021    source источник


Ответы (1)


Это медленно, потому что вы пишете каждую запись отдельно, обернутую в отдельную транзакцию. Высокопроизводительный приемник базы данных будет выполнять буферизацию, массовую запись и фиксировать транзакции как часть контрольной точки.

Если вам нужны ровно однократные гарантии и вас устраивает семантика upsert, вы можете использовать существующий приемник JDBC FLINK. Если вам требуется двухэтапная фиксация, она уже была объединена с мастером и будет включена в Flink 1.13. См. FLINK-15578.

Обновлять:

Для upsert нет стандартного синтаксиса SQL; вам нужно будет выяснить, поддерживает ли это ваша база данных и каким образом. Например:

MySQL:

INSERT ... ON DUPLICATE KEY UPDATE ...

PostgreSQL:

INSERT ... ON CONFLICT ... DO UPDATE SET ...

Как бы то ни было, подобные приложения обычно проще реализовать с помощью Flink SQL. В этом случае вы должны использовать Kinesis соединитель таблицы вместе с Соединитель таблицы JDBC.

person David Anderson    schedule 05.03.2021
comment
Привет, @David, я использую Flink 1.11, и мне не удалось найти поддержки для хранения POJO непосредственно в JDBC. Было бы здорово, если бы вы подвели меня к правильному пути сделать то же самое. - person user7665394; 05.03.2021
comment
Что ты пробовал? Вы смотрели ci.apache? org / projects / flink / flink-docs-release-1.11 / dev /? - person David Anderson; 05.03.2021
comment
Я не пробовал, потому что нам нужно вручную писать запрос. Я попробую это. Можете ли вы сказать, как мы достигаем семантики upsert с помощью такого запроса. - person user7665394; 05.03.2021
comment
Спасибо большое за вашу помощь. - person user7665394; 05.03.2021