Я создал приложение, в котором я читаю данные из потоков Kinesis и сохраняю их в таблице mysql.
Я попробовал загрузить приложение. На 100к работ требуется более 3 часов. Любое предположение, почему это происходит так медленно. Еще одна вещь: первичный ключ моей таблицы состоит из 7 столбцов.
Я использую спящий режим для хранения POJO непосредственно в базе данных.
код:
public class JDBCSink extends RichSinkFunction<CompetitorConfig> {
private static SessionFactory sessionFactory;
private static StandardServiceRegistryBuilder serviceRegistry;
private Session session;
private String username;
private String password;
private static final Logger LOG = LoggerFactory.getLogger(CompetitorConfigJDBCSink.class);
public JDBCSink(String user, String pass) {
username = user;
password = pass;
}
public static void configureHibernateUtil(String username, String password) {
try {
Properties prop= new Properties();
prop.setProperty("hibernate.dialect", "org.hibernate.dialect.MySQLDialect");
prop.setProperty("hibernate.connection.driver_class", "com.mysql.cj.jdbc.Driver");
prop.setProperty("hibernate.connection.url", "url");
prop.setProperty("hibernate.connection.username", username);
prop.setProperty("hibernate.connection.password", password);
org.hibernate.cfg.Configuration configuration = new org.hibernate.cfg.Configuration().addProperties(prop);
configuration.addAnnotatedClass(CompetitorConfig.class);
serviceRegistry = new StandardServiceRegistryBuilder().applySettings(configuration.getProperties());
sessionFactory = configuration.buildSessionFactory(serviceRegistry.build());
} catch (Throwable ex) {
throw new ExceptionInInitializerError(ex);
}
}
@Override
public void open(Configuration parameters) throws Exception {
configureHibernateUtil(username,password);
this.session = sessionFactory.openSession();
}
@Override
public void invoke(CompetitorConfig value) throws Exception {
Transaction transaction = null;
try {
transaction = session.beginTransaction();
session.merge(value);
session.flush();
} catch (Exception e) {
throw e;
} finally {
transaction.commit();
}
}
@Override
public void close() throws Exception {
this.session.close();
sessionFactory.close();
}
}