Массовая вставка / обновление с помощью Petapoco

Я использую метод Save() для вставки или обновления записей, но я хотел бы, чтобы он выполнял массовую вставку и массовое обновление только с одним обращением к базе данных. Как мне это сделать?

person silba    schedule 06.07.2011    source источник

Ответы (9)

В моем случае я воспользовался методом database.Execute().

Я создал параметр SQL, в котором была первая часть моей вставки:

var sql = new Sql("insert into myTable(Name, Age, Gender) values");

for (int i = 0; i < pocos.Count ; ++i)
   var p = pocos[i];
   sql.Append("(@0, @1, @2)", p.Name, p.Age , p.Gender);
   if(i != pocos.Count -1)

person taylonr    schedule 10.05.2012
Он создает StackOverflow в методе PetaPoco Sql.Append (), если в списке много элементов if. - person Zelid; 22.09.2012

Я пробовал два разных метода вставки большого количества строк быстрее, чем вставка по умолчанию (что довольно медленно, когда у вас много строк).

1) Составление списка ‹T› сначала из poco, а затем их одновременная вставка в цикл (и в транзакцию):

using (var tr = PetaPocoDb.GetTransaction())
    foreach (var record in listOfRecords)

2) SqlBulkCopy a DataTable:

var bulkCopy = new SqlBulkCopy(connectionString, SqlBulkCopyOptions.TableLock);
bulkCopy.DestinationTableName = "SomeTable";

Чтобы получить мой List ‹T› в DataTable, я использовал Marc Gravells Преобразовать общий список / Enumerable в DataTable?, которая сработала для меня ootb (после того, как я изменил свойства Poco, чтобы они были в том же порядке, что и поля таблицы в db.)

SqlBulkCopy был самым быстрым, примерно на 50% быстрее, чем метод транзакций в (быстрых) тестах производительности, которые я провел с ~ 1000 строками.


person joeriks    schedule 22.11.2011
Я думаю, что ваш первый метод все еще идет в базу данных для каждой вставки - person IvoTops; 20.03.2013
Ага, интересно было бы сравнить скорости совмещенных вставок tsql. В моем случае я перестал копать, чтобы повысить производительность, когда заметил, что работаю всего на 50% медленнее, чем объемное копирование. - person joeriks; 20.03.2013

Вставка в один запрос SQL выполняется намного быстрее.

Вот метод клиента для класса PetaPoco.Database, который добавляет возможность выполнять массовую вставку любой коллекции:

public void BulkInsertRecords<T>(IEnumerable<T> collection)
                using (var cmd = CreateCommand(_sharedConnection, ""))
                    var pd = Database.PocoData.ForType(typeof(T));
                    var tableName = EscapeTableName(pd.TableInfo.TableName);
                    string cols = string.Join(", ", (from c in pd.QueryColumns select tableName + "." + EscapeSqlIdentifier(c)).ToArray());
                    var pocoValues = new List<string>();
                    var index = 0;
                    foreach (var poco in collection)
                        var values = new List<string>();
                        foreach (var i in pd.Columns)
                            values.Add(string.Format("{0}{1}", _paramPrefix, index++));
                            AddParam(cmd, i.Value.GetValue(poco), _paramPrefix);
                        pocoValues.Add("(" + string.Join(",", values.ToArray()) + ")");
                    var sql = string.Format("INSERT INTO {0} ({1}) VALUES {2}", tableName, cols, string.Join(", ", pocoValues));
                    cmd.CommandText = sql;
person Zelid    schedule 23.01.2013
Очень хорошо, было бы интересно сравнить перфомансы с описываемым мною методом транзакции (я считаю, что ваш быстрее, но насколько?) Также - афайк - если вы оберните свои вставки внутри транзакции, вы должны получить немного больше перфоманса (blog.staticvoid.co.nz/2012/4/26/) - person joeriks; 20.03.2013

Вот обновленная версия ответа Стива Янсена, который разбивается на блоки по максимум 2100 пако.

Я закомментировал следующий код, поскольку он создает дубликаты в базе данных ...

                //using (var reader = cmd.ExecuteReader())
                //    while (reader.Read())
                //    {
                //        inserted.Add(reader[0]);
                //    }

Обновленный код

    /// <summary>
    /// Performs an SQL Insert against a collection of pocos
    /// </summary>
    /// <param name="pocos">A collection of POCO objects that specifies the column values to be inserted.  Assumes that every POCO is of the same type.</param>
    /// <returns>An array of the auto allocated primary key of the new record, or null for non-auto-increment tables</returns>
    public object BulkInsert(IEnumerable<object> pocos)
        Sql sql;
        IList<PocoColumn> columns = new List<PocoColumn>();
        IList<object> parameters;
        IList<object> inserted;
        PocoData pd;
        Type primaryKeyType;
        object template;
        string commandText;
        string tableName;
        string primaryKeyName;
        bool autoIncrement;

        int maxBulkInsert;

        if (null == pocos)
            return new object[] { };

        template = pocos.First<object>();

        if (null == template)
            return null;

        pd = PocoData.ForType(template.GetType());
        tableName = pd.TableInfo.TableName;
        primaryKeyName = pd.TableInfo.PrimaryKey;
        autoIncrement = pd.TableInfo.AutoIncrement;

        //Calculate the maximum chunk size
        maxBulkInsert = 2100 / pd.Columns.Count;
        IEnumerable<object> pacosToInsert = pocos.Take(maxBulkInsert);
        IEnumerable<object> pacosremaining = pocos.Skip(maxBulkInsert);

                var names = new List<string>();
                var values = new List<string>();
                var index = 0;

                foreach (var i in pd.Columns)
                    // Don't insert result columns
                    if (i.Value.ResultColumn)

                    // Don't insert the primary key (except under oracle where we need bring in the next sequence value)
                    if (autoIncrement && primaryKeyName != null && string.Compare(i.Key, primaryKeyName, true) == 0)
                        primaryKeyType = i.Value.PropertyInfo.PropertyType;

                        // Setup auto increment expression
                        string autoIncExpression = _dbType.GetAutoIncrementExpression(pd.TableInfo);
                        if (autoIncExpression != null)

                    values.Add(string.Format("{0}{1}", _paramPrefix, index++));

                string outputClause = String.Empty;
                if (autoIncrement)
                    outputClause = _dbType.GetInsertOutputClause(primaryKeyName);

                commandText = string.Format("INSERT INTO {0} ({1}){2} VALUES",
                                string.Join(",", names.ToArray()),

                sql = new Sql(commandText);
                parameters = new List<object>();
                string valuesText = string.Concat("(", string.Join(",", values.ToArray()), ")");
                bool isFirstPoco = true;
                var parameterCounter = 0;

                foreach (object poco in pacosToInsert)

                    foreach (PocoColumn column in columns)

                    sql.Append(valuesText, parameters.ToArray<object>());

                    if (isFirstPoco && pocos.Count() > 1)
                        valuesText = "," + valuesText;
                        isFirstPoco = false;

                inserted = new List<object>();

                using (var cmd = CreateCommand(_sharedConnection, sql.SQL, sql.Arguments))
                    if (!autoIncrement)

                        PocoColumn pkColumn;
                        if (primaryKeyName != null && pd.Columns.TryGetValue(primaryKeyName, out pkColumn))
                            foreach (object poco in pocos)

                        return inserted.ToArray<object>();

                    object id = _dbType.ExecuteInsert(this, cmd, primaryKeyName);

                    if (pacosremaining.Any())
                        return BulkInsert(pacosremaining);

                    return id;

                    //using (var reader = cmd.ExecuteReader())
                    //    while (reader.Read())
                    //    {
                    //        inserted.Add(reader[0]);
                    //    }

                    //object[] primaryKeys = inserted.ToArray<object>();

                    //// Assign the ID back to the primary key property
                    //if (primaryKeyName != null)
                    //    PocoColumn pc;
                    //    if (pd.Columns.TryGetValue(primaryKeyName, out pc))
                    //    {
                    //        index = 0;
                    //        foreach (object poco in pocos)
                    //        {
                    //            pc.SetValue(poco, pc.ChangeType(primaryKeys[index]));
                    //            index++;
                    //        }
                    //    }

                    //return primaryKeys;
        catch (Exception x)
            if (OnException(x))
            return null;
person robgha01    schedule 23.06.2013

Ниже приведен метод BulkInsert PetaPoco, который расширяет очень умную идею taylonr по использованию техники SQL для вставки нескольких строк через INSERT INTO tab(col1, col2) OUTPUT inserted.[ID] VALUES (@0, @1), (@2, 3), (@4, @5), ..., (@n-1, @n).

Он также возвращает значения автоинкремента (идентичности) вставленных записей, что, я не думаю, происходит в реализации IvoTops.

ПРИМЕЧАНИЕ. SQL Server 2012 (и ниже) имеет ограничение в 2100 параметров на запрос. (Вероятно, это источник исключения переполнения стека, на которое ссылается комментарий Zelid). Вам нужно будет вручную разделить пакеты по количеству столбцов, которые не обозначены как Ignore или Result. Например, POCO с 21 столбцом следует отправлять пакетами размером 99 или (2100 - 1) / 21. Я могу провести рефакторинг для динамического разделения пакетов на основе этого ограничения для SQL Server; однако вы всегда будете видеть наилучшие результаты, управляя размером пакета вне этого метода.

Этот метод показал примерно 50% -ное увеличение времени выполнения по сравнению с моей предыдущей техникой использования общего соединения в одной транзакции для всех вставок.

Это одна из областей, в которой Massive действительно выделяется - у Massive есть Save (params object [] things), который создает массив IDbCommands и выполняет каждую из них в общем соединении. Он работает "из коробки" и не выходит за рамки ограничений по параметрам.

/// <summary>
/// Performs an SQL Insert against a collection of pocos
/// </summary>
/// <param name="pocos">A collection of POCO objects that specifies the column values to be inserted.  Assumes that every POCO is of the same type.</param>
/// <returns>An array of the auto allocated primary key of the new record, or null for non-auto-increment tables</returns>
/// <remarks>
///     NOTE: As of SQL Server 2012, there is a limit of 2100 parameters per query.  This limitation does not seem to apply on other platforms, so 
///           this method will allow more than 2100 parameters.  See http://msdn.microsoft.com/en-us/library/ms143432.aspx
///     The name of the table, it's primary key and whether it's an auto-allocated primary key are retrieved from the attributes of the first POCO in the collection
/// </remarks>
public object[] BulkInsert(IEnumerable<object> pocos)
    Sql sql;
    IList<PocoColumn> columns = new List<PocoColumn>();
    IList<object> parameters;
    IList<object> inserted;
    PocoData pd;
    Type primaryKeyType;
    object template;
    string commandText;
    string tableName;
    string primaryKeyName;
    bool autoIncrement;

    if (null == pocos)
        return new object[] {};

    template = pocos.First<object>();

    if (null == template)
        return null;

    pd = PocoData.ForType(template.GetType());
    tableName = pd.TableInfo.TableName;
    primaryKeyName = pd.TableInfo.PrimaryKey;
    autoIncrement = pd.TableInfo.AutoIncrement;

            var names = new List<string>();
            var values = new List<string>();
            var index = 0;
            foreach (var i in pd.Columns)
                // Don't insert result columns
                if (i.Value.ResultColumn)

                // Don't insert the primary key (except under oracle where we need bring in the next sequence value)
                if (autoIncrement && primaryKeyName != null && string.Compare(i.Key, primaryKeyName, true) == 0)
                    primaryKeyType = i.Value.PropertyInfo.PropertyType;

                    // Setup auto increment expression
                    string autoIncExpression = _dbType.GetAutoIncrementExpression(pd.TableInfo);
                    if (autoIncExpression != null)

                values.Add(string.Format("{0}{1}", _paramPrefix, index++));

            string outputClause = String.Empty;
            if (autoIncrement)
                outputClause = _dbType.GetInsertOutputClause(primaryKeyName);

            commandText = string.Format("INSERT INTO {0} ({1}){2} VALUES",
                            string.Join(",", names.ToArray()),

            sql = new Sql(commandText);
            parameters = new List<object>();
            string valuesText = string.Concat("(", string.Join(",", values.ToArray()), ")");
            bool isFirstPoco = true;

            foreach (object poco in pocos)
                foreach (PocoColumn column in columns)

                sql.Append(valuesText, parameters.ToArray<object>());

                if (isFirstPoco)
                    valuesText = "," + valuesText;
                    isFirstPoco = false;

            inserted = new List<object>();

            using (var cmd = CreateCommand(_sharedConnection, sql.SQL, sql.Arguments))
                if (!autoIncrement)

                    PocoColumn pkColumn;
                    if (primaryKeyName != null && pd.Columns.TryGetValue(primaryKeyName, out pkColumn))
                        foreach (object poco in pocos)

                    return inserted.ToArray<object>();

                // BUG: the following line reportedly causes duplicate inserts; need to confirm
                //object id = _dbType.ExecuteInsert(this, cmd, primaryKeyName);

                using(var reader = cmd.ExecuteReader())
                    while (reader.Read())

                object[] primaryKeys = inserted.ToArray<object>();

                // Assign the ID back to the primary key property
                if (primaryKeyName != null)
                    PocoColumn pc;
                    if (pd.Columns.TryGetValue(primaryKeyName, out pc))
                        index = 0;
                        foreach(object poco in pocos)
                            pc.SetValue(poco, pc.ChangeType(primaryKeys[index]));

                return primaryKeys;
    catch (Exception x)
        if (OnException(x))
        return null;
person Steve Jansen    schedule 18.06.2013
Другой ответ указал на серьезную ошибку в пути автоинкремента, из-за которой каждая запись вставляется дважды, но в некотором роде все испортил в процессе. Все, что мне нужно было сделать, это удалить следующую строку: object id = _dbType.ExecuteInsert(this, cmd, primaryKeyName);. Я бы отредактировал ваш пост, но вам, вероятно, следует проверить, действительно ли указанная строка может быть безопасно удалена. - person Aaronaught; 10.05.2014
Привет, @Aaronaught, спасибо за внимание. Я закомментировал эту строку, пока не смогу получить доступ к ящику Windows с VS для тестирования. Эта ошибка меня не удивляет, так как база данных, для которой я создал этот код, довольно снисходительна к повторяющимся вставкам. - person Steve Jansen; 11.05.2014

Вот код для BulkInsert, который вы можете добавить в v5.01 PetaPoco.cs

Вы можете вставить его где-нибудь рядом с обычной вставкой в ​​строке 1098

Вы даете ему IEnumerable Pocos, и он отправляет его в базу данных

партиями по x вместе. Код на 90% от штатной вставки.

У меня нет сравнения производительности, дайте знать :)

    /// <summary>
    /// Bulk inserts multiple rows to SQL
    /// </summary>
    /// <param name="tableName">The name of the table to insert into</param>
    /// <param name="primaryKeyName">The name of the primary key column of the table</param>
    /// <param name="autoIncrement">True if the primary key is automatically allocated by the DB</param>
    /// <param name="pocos">The POCO objects that specifies the column values to be inserted</param>
    /// <param name="batchSize">The number of POCOS to be grouped together for each database rounddtrip</param>        
    public void BulkInsert(string tableName, string primaryKeyName, bool autoIncrement, IEnumerable<object> pocos, int batchSize = 25)
                using (var cmd = CreateCommand(_sharedConnection, ""))
                    var pd = PocoData.ForObject(pocos.First(), primaryKeyName);
                    // Create list of columnnames only once
                    var names = new List<string>();
                    foreach (var i in pd.Columns)
                        // Don't insert result columns
                        if (i.Value.ResultColumn)

                        // Don't insert the primary key (except under oracle where we need bring in the next sequence value)
                        if (autoIncrement && primaryKeyName != null && string.Compare(i.Key, primaryKeyName, true) == 0)
                            // Setup auto increment expression
                            string autoIncExpression = _dbType.GetAutoIncrementExpression(pd.TableInfo);
                            if (autoIncExpression != null)
                    var namesArray = names.ToArray();

                    var values = new List<string>();
                    int count = 0;
                        cmd.CommandText = "";
                        var index = 0;
                        foreach (var poco in pocos.Skip(count).Take(batchSize))
                            foreach (var i in pd.Columns)
                                // Don't insert result columns
                                if (i.Value.ResultColumn) continue;

                                // Don't insert the primary key (except under oracle where we need bring in the next sequence value)
                                if (autoIncrement && primaryKeyName != null && string.Compare(i.Key, primaryKeyName, true) == 0)
                                    // Setup auto increment expression
                                    string autoIncExpression = _dbType.GetAutoIncrementExpression(pd.TableInfo);
                                    if (autoIncExpression != null)

                                values.Add(string.Format("{0}{1}", _paramPrefix, index++));
                                AddParam(cmd, i.Value.GetValue(poco), i.Value.PropertyInfo);

                            string outputClause = String.Empty;
                            if (autoIncrement)
                                outputClause = _dbType.GetInsertOutputClause(primaryKeyName);

                            cmd.CommandText += string.Format("INSERT INTO {0} ({1}){2} VALUES ({3})", _dbType.EscapeTableName(tableName),
                                                             string.Join(",", namesArray), outputClause, string.Join(",", values.ToArray()));
                        // Are we done?
                        if (cmd.CommandText == "") break;
                        count += batchSize;
                    while (true);

        catch (Exception x)
            if (OnException(x))

    /// <summary>
    /// Performs a SQL Bulk Insert
    /// </summary>
    /// <param name="pocos">The POCO objects that specifies the column values to be inserted</param>        
    /// <param name="batchSize">The number of POCOS to be grouped together for each database rounddtrip</param>        
    public void BulkInsert(IEnumerable<object> pocos, int batchSize = 25)
        if (!pocos.Any()) return;
        var pd = PocoData.ForType(pocos.First().GetType());
        BulkInsert(pd.TableInfo.TableName, pd.TableInfo.PrimaryKey, pd.TableInfo.AutoIncrement, pocos);
person IvoTops    schedule 20.03.2013

И в тех же строках, если вы хотите BulkUpdate:

public void BulkUpdate<T>(string tableName, string primaryKeyName, IEnumerable<T> pocos, int batchSize = 25)
        object primaryKeyValue = null;

            using (var cmd = CreateCommand(_sharedConnection, ""))
                var pd = PocoData.ForObject(pocos.First(), primaryKeyName);

                int count = 0;
                    cmd.CommandText = "";
                    var index = 0;

                    var cmdText = new StringBuilder();

                    foreach (var poco in pocos.Skip(count).Take(batchSize))
                        var sb = new StringBuilder();
                        var colIdx = 0;
                        foreach (var i in pd.Columns)
                            // Don't update the primary key, but grab the value if we don't have it
                            if (string.Compare(i.Key, primaryKeyName, true) == 0)
                                primaryKeyValue = i.Value.GetValue(poco);

                            // Dont update result only columns
                            if (i.Value.ResultColumn)

                            // Build the sql
                            if (colIdx > 0)
                                sb.Append(", ");
                            sb.AppendFormat("{0} = {1}{2}", _dbType.EscapeSqlIdentifier(i.Key), _paramPrefix,

                            // Store the parameter in the command
                            AddParam(cmd, i.Value.GetValue(poco), i.Value.PropertyInfo);

                        // Find the property info for the primary key
                        PropertyInfo pkpi = null;
                        if (primaryKeyName != null)
                            pkpi = pd.Columns[primaryKeyName].PropertyInfo;

                        cmdText.Append(string.Format("UPDATE {0} SET {1} WHERE {2} = {3}{4};\n",
                                                     _dbType.EscapeTableName(tableName), sb.ToString(),
                                                     _dbType.EscapeSqlIdentifier(primaryKeyName), _paramPrefix,
                        AddParam(cmd, primaryKeyValue, pkpi);

                    if (cmdText.Length == 0) break;

                    if (_providerName.IndexOf("oracle", StringComparison.OrdinalIgnoreCase) >= 0)
                        cmdText.Insert(0, "BEGIN\n");
                        cmdText.Append("\n END;");


                    cmd.CommandText = cmdText.ToString();
                    count += batchSize;

                } while (true);
    catch (Exception x)
        if (OnException(x))
person Phalgun Erra    schedule 26.09.2013

Вот хорошее обновление 2018 года с использованием FastMember от NuGet:

    private static void SqlBulkCopyPoco<T>(PetaPoco.Database db, IEnumerable<T> data)
        var pd = PocoData.ForType(typeof(T), db.DefaultMapper);
        using (var bcp = new SqlBulkCopy(db.ConnectionString))
        using (var reader = ObjectReader.Create(data)) 
            // set up a mapping from the property names to the column names
            var propNames = typeof(T).GetProperties().Select(propertyInfo => propertyInfo.Name).ToArray();
            foreach (var propName in propNames)
                bcp.ColumnMappings.Add(propName, "[" + pd.GetColumnName(propName) + "]");
            bcp.DestinationTableName = pd.TableInfo.TableName;
person Don    schedule 23.03.2018

Вы можете просто выполнить foreach в своих записях.

foreach (var record in records) {
person Schotime    schedule 06.07.2011
Это создает только одно обращение к базе данных? - person Dan; 13.09.2011
Нет, он попадает в базу данных один раз за запись. Как вы ожидаете сделать это за одно обращение к базе данных? Если вы просто не хотите сгенерировать оператор обновления и выполнить его. - person Schotime; 14.09.2011