Logo Search packages:      
Sourcecode: sqlalchemy version File versions  Download package

def sqlalchemy::orm::mapper::Mapper::save_obj (   self,
  objects,
  uowtransaction,
  postupdate = False,
  post_update_cols = None,
  single = False 
)

issue INSERT and/or UPDATE statements for a list of objects.

this is called within the context of a UOWTransaction during a flush operation.

save_obj issues SQL statements not just for instances mapped directly by this mapper, but
for instances mapped by all inheriting mappers as well.  This is to maintain proper insert
ordering among a polymorphic chain of instances. Therefore save_obj is typically 
called only on a "base mapper", or a mapper which does not inherit from any other mapper.

Definition at line 799 of file mapper.py.

00799                                                                                                       :
        """issue INSERT and/or UPDATE statements for a list of objects.

        this is called within the context of a UOWTransaction during a flush operation.
        
        save_obj issues SQL statements not just for instances mapped directly by this mapper, but
        for instances mapped by all inheriting mappers as well.  This is to maintain proper insert
        ordering among a polymorphic chain of instances. Therefore save_obj is typically 
        called only on a "base mapper", or a mapper which does not inherit from any other mapper."""
        
        self.__log_debug("save_obj() start, " + (single and "non-batched" or "batched"))
        
        # if batch=false, call save_obj separately for each object
        if not single and not self.batch:
            for obj in objects:
                self.save_obj([obj], uowtransaction, postupdate=postupdate, post_update_cols=post_update_cols, single=True)
            return
            
        connection = uowtransaction.transaction.connection(self)

        if not postupdate:
            for obj in objects:
                if not has_identity(obj):
                    for mapper in object_mapper(obj).iterate_to_root():
                        mapper.extension.before_insert(mapper, connection, obj)
                else:
                    for mapper in object_mapper(obj).iterate_to_root():
                        mapper.extension.before_update(mapper, connection, obj)

        for obj in objects:
            # detect if we have a "pending" instance (i.e. has no instance_key attached to it),
            # and another instance with the same identity key already exists as persistent.  convert to an 
            # UPDATE if so.
            mapper = object_mapper(obj)
            instance_key = mapper.instance_key(obj)
            is_row_switch = not postupdate and not has_identity(obj) and instance_key in uowtransaction.uow.identity_map
            if is_row_switch:
                existing = uowtransaction.uow.identity_map[instance_key]
                if not uowtransaction.is_deleted(existing):
                    raise exceptions.FlushError("New instance %s with identity key %s conflicts with persistent instance %s" % (mapperutil.instance_str(obj), str(instance_key), mapperutil.instance_str(existing)))
                self.__log_debug("detected row switch for identity %s.  will update %s, remove %s from transaction" % (instance_key, mapperutil.instance_str(obj), mapperutil.instance_str(existing)))
                uowtransaction.unregister_object(existing)
            
        inserted_objects = util.Set()
        updated_objects = util.Set()
        
        table_to_mapper = {}
        for mapper in self.base_mapper().polymorphic_iterator():
            for t in mapper.tables:
                table_to_mapper.setdefault(t, mapper)

        for table in sqlutil.TableCollection(list(table_to_mapper.keys())).sort(reverse=False):
            # two lists to store parameters for each table/object pair located
            insert = []
            update = []
            
            for obj in objects:
                mapper = object_mapper(obj)
                if table not in mapper.tables or not mapper._has_pks(table):
                    continue
                instance_key = mapper.instance_key(obj)
                self.__log_debug("save_obj() table '%s' instance %s identity %s" % (table.name, mapperutil.instance_str(obj), str(instance_key)))

                isinsert = not instance_key in uowtransaction.uow.identity_map and not postupdate and not has_identity(obj)
                params = {}
                hasdata = False
                for col in table.columns:
                    if col is mapper.version_id_col:
                        if not isinsert:
                            params[col._label] = mapper.get_attr_by_column(obj, col)
                            params[col.key] = params[col._label] + 1
                        else:
                            params[col.key] = 1
                    elif col in mapper.pks_by_table[table]:
                        # column is a primary key ?
                        if not isinsert:
                            # doing an UPDATE?  put primary key values as "WHERE" parameters
                            # matching the bindparam we are creating below, i.e. "<tablename>_<colname>"
                            params[col._label] = mapper.get_attr_by_column(obj, col)
                        else:
                            # doing an INSERT, primary key col ? 
                            # if the primary key values are not populated,
                            # leave them out of the INSERT altogether, since PostGres doesn't want
                            # them to be present for SERIAL to take effect.  A SQLEngine that uses
                            # explicit sequences will put them back in if they are needed
                            value = mapper.get_attr_by_column(obj, col)
                            if value is not None:
                                params[col.key] = value
                    elif mapper.polymorphic_on is not None and mapper.polymorphic_on.shares_lineage(col):
                        if isinsert:
                            self.__log_debug("Using polymorphic identity '%s' for insert column '%s'" % (mapper.polymorphic_identity, col.key))
                            value = mapper.polymorphic_identity
                            if col.default is None or value is not None:
                                params[col.key] = value
                    else:
                        # column is not a primary key ?
                        if not isinsert:
                            # doing an UPDATE ? get the history for the attribute, with "passive"
                            # so as not to trigger any deferred loads.  if there is a new
                            # value, add it to the bind parameters
                            if post_update_cols is not None and col not in post_update_cols:
                                continue
                            elif is_row_switch:
                                params[col.key] = self.get_attr_by_column(obj, col)
                                hasdata = True
                                continue
                            prop = mapper._getpropbycolumn(col, False)
                            if prop is None:
                                continue
                            history = prop.get_history(obj, passive=True)
                            if history:
                                a = history.added_items()
                                if len(a):
                                    params[col.key] = a[0]
                                    hasdata = True
                        else:
                            # doing an INSERT, non primary key col ? 
                            # add the attribute's value to the 
                            # bind parameters, unless its None and the column has a 
                            # default.  if its None and theres no default, we still might
                            # not want to put it in the col list but SQLIte doesnt seem to like that
                            # if theres no columns at all
                            value = mapper.get_attr_by_column(obj, col, False)
                            if value is NO_ATTRIBUTE:
                                continue
                            if col.default is None or value is not None:
                                params[col.key] = value

                if not isinsert:
                    if hasdata:
                        # if none of the attributes changed, dont even
                        # add the row to be updated.
                        update.append((obj, params, mapper))
                else:
                    insert.append((obj, params, mapper))

            if len(update):
                mapper = table_to_mapper[table]
                clause = sql.and_()
                for col in mapper.pks_by_table[table]:
                    clause.clauses.append(col == sql.bindparam(col._label, type=col.type))
                if mapper.version_id_col is not None:
                    clause.clauses.append(mapper.version_id_col == sql.bindparam(mapper.version_id_col._label, type=col.type))
                statement = table.update(clause)
                rows = 0
                supports_sane_rowcount = True
                def comparator(a, b):
                    for col in mapper.pks_by_table[table]:
                        x = cmp(a[1][col._label],b[1][col._label])
                        if x != 0:
                            return x
                    return 0
                update.sort(comparator)
                for rec in update:
                    (obj, params, mapper) = rec
                    c = connection.execute(statement, params)
                    mapper._postfetch(connection, table, obj, c, c.last_updated_params())

                    updated_objects.add(obj)
                    rows += c.cursor.rowcount

                if c.supports_sane_rowcount() and rows != len(update):
                    raise exceptions.ConcurrentModificationError("Updated rowcount %d does not match number of objects updated %d" % (rows, len(update)))

            if len(insert):
                statement = table.insert()
                def comparator(a, b):
                    return cmp(a[0]._sa_insert_order, b[0]._sa_insert_order)
                insert.sort(comparator)
                for rec in insert:
                    (obj, params, mapper) = rec
                    c = connection.execute(statement, params)
                    primary_key = c.last_inserted_ids()
                    if primary_key is not None:
                        i = 0
                        for col in mapper.pks_by_table[table]:
                            if mapper.get_attr_by_column(obj, col) is None and len(primary_key) > i:
                                mapper.set_attr_by_column(obj, col, primary_key[i])
                            i+=1
                    mapper._postfetch(connection, table, obj, c, c.last_inserted_params())
                    
                    # synchronize newly inserted ids from one table to the next
                    # TODO: this fires off more than needed, try to organize syncrules
                    # per table
                    def sync(mapper):
                        inherit = mapper.inherits
                        if inherit is not None:
                            sync(inherit)
                        if mapper._synchronizer is not None:
                            mapper._synchronizer.execute(obj, obj)
                    sync(mapper)
                    
                    inserted_objects.add(obj)
        if not postupdate:
            for obj in inserted_objects:
                for mapper in object_mapper(obj).iterate_to_root():
                    mapper.extension.after_insert(mapper, connection, obj)
            for obj in updated_objects:
                for mapper in object_mapper(obj).iterate_to_root():
                    mapper.extension.after_update(mapper, connection, obj)

    def _postfetch(self, connection, table, obj, resultproxy, params):


Generated by  Doxygen 1.6.0   Back to index