Skip to content

User Guide

pgbulk

Bulk Postgres upsert and update functions.

Briefly, these are the core functions and objects:

pgbulk.upsert has other objects related to advanced usage:

  • pgbulk.UpsertResult encapsulates created and updated values when using the returning flag of pgbulk.upsert.
  • pgbulk.UpdateField allows one to specify expressions for updating fields in the upsert, for example, incrementing fields or conditionally ignoring updates.

pgbulk.UpdateField

UpdateField(field: str, expression: Union[Expression, None] = None)

Bases: UserString

For expressing an update field as an expression to an upsert operation.

Example:

results = pgbulk.upsert(
    MyModel,
    [
        MyModel(some_int_field=0, some_key="a"),
        MyModel(some_int_field=0, some_key="b")
    ],
    ["some_key"],
    [
        pgbulk.UpdateField(
            "some_int_field",
            expression=models.F('some_int_field') + 1
        )
    ],
)
Source code in pgbulk/core.py
def __init__(self, field: str, expression: Union[models.Expression, None] = None):
    self.data = field
    self.expression = expression

pgbulk.UpsertResult

Bases: list

Returned by pgbulk.upsert when the returning argument is provided.

Wraps a list of named tuples where the names correspond to the underlying Django model attribute names.

Also provides properties to access created and updated rows.

created property

created: List[namedtuple]

Return the created rows

updated property

updated: List[namedtuple]

Return the updated rows

pgbulk.aupdate async

aupdate(
    queryset: Union[Type[Model], QuerySet],
    model_objs: Iterable[Model],
    update_fields: Union[List[str], None] = None,
    exclude: Union[List[str], None] = None,
) -> None

Perform an asynchronous bulk update.

See pgbulk.update

Note

Like other async Django ORM methods, aupdate currently wraps update in a sync_to_async wrapper. It does not yet use an asynchronous database driver but will in the future.

Source code in pgbulk/core.py
async def aupdate(
    queryset: Union[Type[models.Model], models.QuerySet],
    model_objs: Iterable[models.Model],
    update_fields: Union[List[str], None] = None,
    exclude: Union[List[str], None] = None,
) -> None:
    """
    Perform an asynchronous bulk update.

    See [pgbulk.update][]

    Note:
        Like other async Django ORM methods, `aupdate` currently wraps `update` in
        a `sync_to_async` wrapper. It does not yet use an asynchronous database
        driver but will in the future.
    """
    return await sync_to_async(update)(queryset, model_objs, update_fields, exclude)

pgbulk.aupsert async

aupsert(
    queryset: Union[Type[Model], QuerySet],
    model_objs: Iterable[Model],
    unique_fields: List[str],
    update_fields: UpdateFieldsTypeDef = None,
    *,
    returning: Union[List[str], bool] = False,
    exclude: Union[List[str], None] = None,
    redundant_updates: bool = False
) -> UpsertResult

Perform an asynchronous bulk upsert.

See pgbulk.upsert

Note

Like other async Django ORM methods, aupsert currently wraps upsert in a sync_to_async wrapper. It does not yet use an asynchronous database driver but will in the future.

Source code in pgbulk/core.py
async def aupsert(
    queryset: Union[Type[models.Model], models.QuerySet],
    model_objs: Iterable[models.Model],
    unique_fields: List[str],
    update_fields: UpdateFieldsTypeDef = None,
    *,
    returning: Union[List[str], bool] = False,
    exclude: Union[List[str], None] = None,
    redundant_updates: bool = False,
) -> UpsertResult:
    """
    Perform an asynchronous bulk upsert.

    See [pgbulk.upsert][]

    Note:
        Like other async Django ORM methods, `aupsert` currently wraps `upsert` in
        a `sync_to_async` wrapper. It does not yet use an asynchronous database
        driver but will in the future.
    """
    return await sync_to_async(upsert)(
        queryset,
        model_objs,
        unique_fields,
        update_fields,
        returning=returning,
        exclude=exclude,
        redundant_updates=redundant_updates,
    )

pgbulk.update

update(
    queryset: Union[Type[Model], QuerySet],
    model_objs: Iterable[Model],
    update_fields: Union[List[str], None] = None,
    exclude: Union[List[str], None] = None,
) -> None

Performs a bulk update.

Parameters:

Name Type Description Default
queryset Union[Type[Model], QuerySet]

The queryset to use when bulk updating

required
model_objs Iterable[Model]

Model object values to use for the update

required
update_fields Union[List[str], None]

A list of fields on the model objects to update. If None, all fields will be updated.

None
exclude Union[List[str], None]

A list of fields to exclude from the update. This is useful when update_fields is None and you want to exclude fields from being updated.

None
Note

Model signals such as post_save are not emitted.

Example

Update an attribute of multiple models in bulk::

import pgbulk

pgbulk.update(
    MyModel,
    [
        MyModel(id=1, some_attr='some_val1'),
        MyModel(id=2, some_attr='some_val2')
    ],
    # These are the fields that will be updated. If not provided,
    # all fields will be updated
    ['some_attr']
)
Source code in pgbulk/core.py
def update(
    queryset: Union[Type[models.Model], models.QuerySet],
    model_objs: Iterable[models.Model],
    update_fields: Union[List[str], None] = None,
    exclude: Union[List[str], None] = None,
) -> None:
    """
    Performs a bulk update.

    Args:
        queryset: The queryset to use when bulk updating
        model_objs: Model object values to use for the update
        update_fields: A list of fields on the
            model objects to update. If `None`, all fields will be updated.
        exclude: A list of fields to exclude from the update. This is useful
            when `update_fields` is `None` and you want to exclude fields from
            being updated.

    Note:
        Model signals such as `post_save` are not emitted.

    Example:
        Update an attribute of multiple models in bulk::

            import pgbulk

            pgbulk.update(
                MyModel,
                [
                    MyModel(id=1, some_attr='some_val1'),
                    MyModel(id=2, some_attr='some_val2')
                ],
                # These are the fields that will be updated. If not provided,
                # all fields will be updated
                ['some_attr']
            )
    """
    queryset = queryset if isinstance(queryset, models.QuerySet) else queryset.objects.all()  # type: ignore
    connection = connections[queryset.db]
    model = queryset.model
    update_fields = _get_update_fields(queryset, update_fields, exclude)

    # Sort the model objects to reduce the likelihood of deadlocks
    model_objs = sorted(model_objs, key=lambda obj: obj.pk)

    # Add the pk to the value fields so we can join during the update
    value_fields = [model._meta.pk.attname] + update_fields

    row_values = [
        [
            _get_field_db_val(
                queryset,
                model_obj._meta.get_field(field),
                getattr(model_obj, model_obj._meta.get_field(field).attname),  # type: ignore
                connection,
            )
            for field in value_fields
        ]
        for model_obj in model_objs
    ]

    # If we do not have any values or fields to update, just return
    if len(row_values) == 0 or len(update_fields) == 0:
        return

    db_types = [model._meta.get_field(field).db_type(connection) for field in value_fields]

    value_fields_sql = ", ".join(
        '"{field}"'.format(field=model._meta.get_field(field).column) for field in value_fields
    )

    update_fields_sql = ", ".join(
        [
            '"{field}" = "new_values"."{field}"'.format(field=model._meta.get_field(field).column)
            for field in update_fields
        ]
    )

    values_sql = ", ".join(
        [
            "({0})".format(
                ", ".join(
                    [
                        "%s::{0}".format(db_types[i]) if not row_number and i else "%s"
                        for i, _ in enumerate(row)
                    ]
                )
            )
            for row_number, row in enumerate(row_values)
        ]
    )

    update_sql = (
        "UPDATE {table} "
        "SET {update_fields_sql} "
        "FROM (VALUES {values_sql}) AS new_values ({value_fields_sql}) "
        'WHERE "{table}"."{pk_field}" = "new_values"."{pk_field}"'
    ).format(
        table=model._meta.db_table,
        pk_field=model._meta.pk.column,
        update_fields_sql=update_fields_sql,
        values_sql=values_sql,
        value_fields_sql=value_fields_sql,
    )

    update_sql_params = list(itertools.chain(*row_values))

    with connection.cursor() as cursor:
        update_sql_params = _prep_sql_args(queryset, connection, cursor, update_sql_params)
        cursor.execute(update_sql, update_sql_params)  # type: ignore

pgbulk.upsert

upsert(
    queryset: Union[Type[Model], QuerySet],
    model_objs: Iterable[Model],
    unique_fields: List[str],
    update_fields: UpdateFieldsTypeDef = None,
    *,
    exclude: Union[List[str], None] = None,
    returning: Union[List[str], bool] = False,
    redundant_updates: bool = False
) -> UpsertResult

Perform a bulk upsert.

Parameters:

Name Type Description Default
queryset Union[Type[Model], QuerySet]

A model or a queryset that defines the collection to upsert

required
model_objs Iterable[Model]

An iterable of Django models to upsert. All models in this list will be bulk upserted.

required
unique_fields List[str]

A list of fields that define the uniqueness of the model. The model must have a unique constraint on these fields

required
update_fields UpdateFieldsTypeDef

A list of fields to update whenever objects already exist. If an empty list is provided, it is equivalent to doing a bulk insert on the objects that don't exist. If None, all fields will be updated. If you want to perform an expression such as an F object on a field when it is updated, use the pgbulk.UpdateField class. See examples below.

None
exclude Union[List[str], None]

A list of fields to exclude from the upsert. This is useful when update_fields is None and you want to exclude fields from being updated. This is additive to the unique_fields list.

None
returning Union[List[str], bool]

If True, returns all fields. If a list, only returns fields in the list. If False, do not return results from the upsert.

False
redundant_updates bool

Perform an update even if all columns are identical to the row in the database.

False

Returns:

Type Description
UpsertResult

The upsert result, an iterable list of all upsert objects. Use the .updated and .created attributes to iterate over created or updated elements.

Note

Model signals such as post_save are not emitted.

Example

A basic bulk upsert on a model:

import pgbulk

pgbulk.upsert(
    MyModel,
    [
        MyModel(int_field=1, some_attr="some_val1"),
        MyModel(int_field=2, some_attr="some_val2"),
    ],
    # These are the fields that identify the uniqueness constraint.
    ["int_field"],
    # These are the fields that will be updated if the row already
    # exists. If not provided, all fields will be updated
    ["some_attr"]
)
Example

Return the results of an upsert:

results = pgbulk.upsert(
    MyModel,
    [
        MyModel(int_field=1, some_attr="some_val1"),
        MyModel(int_field=2, some_attr="some_val2"),
    ],
    ["int_field"],
    ["some_attr"],
    # `True` will return all columns. One can also explicitly
    # list which columns will be returned
    returning=True
)

# Print which results were created
print(results.created)

# Print which results were updated.
# By default, if an update results in no changes, it will not
# be updated and will not be returned.
print(results.updated)
Example

Upsert values and update rows even when the update is meaningless (i.e. a redundant update). This is turned off by default, but it can be enabled in case postgres triggers or other processes need to happen as a result of an update:

pgbulk.upsert(
    MyModel,
    [
        MyModel(int_field=1, some_attr="some_val1"),
        MyModel(int_field=2, some_attr="some_val2"),
    ],
    ["int_field"],
    ["some_attr"],
    # Perform updates in the database even if it's identical
    redundant_updates=True
)
Example

Use an expression for a field if an update happens. In the example below, we increment some_int_field by one whenever an update happens. Otherwise it defaults to zero:

results = pgbulk.upsert(
    MyModel,
    [
        MyModel(some_int_field=0, some_key="a"),
        MyModel(some_int_field=0, some_key="b")
    ],
    ["some_key"],
    [
        # Use UpdateField to specify an expression for the update.
        pgbulk.UpdateField(
            "some_int_field",
            expression=models.F("some_int_field") + 1
        )
    ],
)
Source code in pgbulk/core.py
def upsert(
    queryset: Union[Type[models.Model], models.QuerySet],
    model_objs: Iterable[models.Model],
    unique_fields: List[str],
    update_fields: UpdateFieldsTypeDef = None,
    *,
    exclude: Union[List[str], None] = None,
    returning: Union[List[str], bool] = False,
    redundant_updates: bool = False,
) -> UpsertResult:
    """
    Perform a bulk upsert.

    Args:
        queryset: A model or a queryset that defines the
            collection to upsert
        model_objs: An iterable of Django models to upsert. All models
            in this list will be bulk upserted.
        unique_fields: A list of fields that define the uniqueness
            of the model. The model must have a unique constraint on these
            fields
        update_fields: A list of fields to update whenever objects already exist.
            If an empty list is provided, it is equivalent to doing a bulk insert on
            the objects that don't exist. If `None`, all fields will be updated.
            If you want to perform an expression such as an `F` object on a field when
            it is updated, use the [pgbulk.UpdateField][] class. See examples below.
        exclude: A list of fields to exclude from the upsert. This is useful
            when `update_fields` is `None` and you want to exclude fields from
            being updated. This is additive to the `unique_fields` list.
        returning: If True, returns all fields. If a list, only returns fields
            in the list. If False, do not return results from the upsert.
        redundant_updates: Perform an update
            even if all columns are identical to the row in the database.

    Returns:
        The upsert result, an iterable list of all upsert objects. Use the `.updated`
            and `.created` attributes to iterate over created or updated elements.

    Note:
        Model signals such as `post_save` are not emitted.

    Example:
        A basic bulk upsert on a model:

            import pgbulk

            pgbulk.upsert(
                MyModel,
                [
                    MyModel(int_field=1, some_attr="some_val1"),
                    MyModel(int_field=2, some_attr="some_val2"),
                ],
                # These are the fields that identify the uniqueness constraint.
                ["int_field"],
                # These are the fields that will be updated if the row already
                # exists. If not provided, all fields will be updated
                ["some_attr"]
            )

    Example:
        Return the results of an upsert:

            results = pgbulk.upsert(
                MyModel,
                [
                    MyModel(int_field=1, some_attr="some_val1"),
                    MyModel(int_field=2, some_attr="some_val2"),
                ],
                ["int_field"],
                ["some_attr"],
                # `True` will return all columns. One can also explicitly
                # list which columns will be returned
                returning=True
            )

            # Print which results were created
            print(results.created)

            # Print which results were updated.
            # By default, if an update results in no changes, it will not
            # be updated and will not be returned.
            print(results.updated)

    Example:
        Upsert values and update rows even when the update is meaningless
        (i.e. a redundant update). This is turned off by default, but it
        can be enabled in case postgres triggers or other processes
        need to happen as a result of an update:

            pgbulk.upsert(
                MyModel,
                [
                    MyModel(int_field=1, some_attr="some_val1"),
                    MyModel(int_field=2, some_attr="some_val2"),
                ],
                ["int_field"],
                ["some_attr"],
                # Perform updates in the database even if it's identical
                redundant_updates=True
            )

    Example:
        Use an expression for a field if an update happens. In the example
        below, we increment `some_int_field` by one whenever an update happens.
        Otherwise it defaults to zero:

            results = pgbulk.upsert(
                MyModel,
                [
                    MyModel(some_int_field=0, some_key="a"),
                    MyModel(some_int_field=0, some_key="b")
                ],
                ["some_key"],
                [
                    # Use UpdateField to specify an expression for the update.
                    pgbulk.UpdateField(
                        "some_int_field",
                        expression=models.F("some_int_field") + 1
                    )
                ],
            )
    """
    return _upsert(
        queryset,
        model_objs,
        unique_fields,
        update_fields=update_fields,
        returning=returning,
        exclude=exclude,
        redundant_updates=redundant_updates,
    )