Skip to content

User Guide

pgbulk

pgbulk.UpdateField

UpdateField(field: str, expression: Union[models.Expression, None] = None)

Bases: UserString

For expressing an update field as an expression to an upsert operation.

Example:

results = pgbulk.upsert(
    MyModel,
    [
        MyModel(some_int_field=0, some_key="a"),
        MyModel(some_int_field=0, some_key="b")
    ],
    ["some_key"],
    [
        pgbulk.UpdateField(
            "some_int_field",
            expression=models.F('some_int_field') + 1
        )
    ],
)
Source code in pgbulk/core.py
def __init__(self, field: str, expression: Union[models.Expression, None] = None):
    self.data = field
    self.expression = expression

pgbulk.UpsertResult

Bases: list

Returned by pgbulk.upsert when the returning argument is provided.

Wraps a list of named tuples where the names correspond to the underlying Django model attribute names.

Also provides properties to access created and updated rows.

created property

created: List[namedtuple]

Return the created rows

updated property

updated: List[namedtuple]

Return the updated rows

pgbulk.aupdate async

aupdate(
    queryset: Union[models.Model, models.QuerySet],
    model_objs: List[models.Model],
    update_fields: Union[List[str], None] = None,
) -> None

Perform an asynchronous bulk update.

See pgbulk.update

Source code in pgbulk/core.py
async def aupdate(
    queryset: Union[models.Model, models.QuerySet],
    model_objs: List[models.Model],
    update_fields: Union[List[str], None] = None,
) -> None:
    """
    Perform an asynchronous bulk update.

    See [pgbulk.update][]
    """
    return await sync_to_async(update)(queryset, model_objs, update_fields)

pgbulk.aupsert async

aupsert(
    queryset: Union[models.Model, models.QuerySet],
    model_objs: List[models.Model],
    unique_fields: List[str],
    update_fields: Union[List[str], None] = None,
    *,
    returning: Union[List[str], bool] = False,
    redundant_updates: bool = False
) -> UpsertResult

Perform an asynchronous bulk upsert.

See pgbulk.upsert

Source code in pgbulk/core.py
async def aupsert(
    queryset: Union[models.Model, models.QuerySet],
    model_objs: List[models.Model],
    unique_fields: List[str],
    update_fields: Union[List[str], None] = None,
    *,
    returning: Union[List[str], bool] = False,
    redundant_updates: bool = False,
) -> UpsertResult:
    """
    Perform an asynchronous bulk upsert.

    See [pgbulk.upsert][]
    """
    return await sync_to_async(upsert)(
        queryset,
        model_objs,
        unique_fields,
        update_fields,
        returning=returning,
        redundant_updates=redundant_updates,
    )

pgbulk.update

update(
    queryset: Union[models.Model, models.QuerySet],
    model_objs: List[models.Model],
    update_fields: Union[List[str], None] = None,
) -> None

Performs a bulk update.

Parameters:

Name Type Description Default
queryset Union[Model, QuerySet]

The queryset to use when bulk updating

required
model_objs List[Model]

Model object values to use for the update

required
update_fields Union[List[str], None]

A list of fields on the model objects to update. If None, all fields will be updated.

None
Example

Update an attribute of multiple models in bulk::

import pgbulk

pgbulk.update(
    MyModel,
    [
        MyModel(id=1, some_attr='some_val1'),
        MyModel(id=2, some_attr='some_val2')
    ],
    # These are the fields that will be updated. If not provided,
    # all fields will be updated
    ['some_attr']
)
Source code in pgbulk/core.py
def update(
    queryset: Union[models.Model, models.QuerySet],
    model_objs: List[models.Model],
    update_fields: Union[List[str], None] = None,
) -> None:
    """
    Performs a bulk update.

    Args:
        queryset: The queryset to use when bulk updating
        model_objs: Model object values to use for the update
        update_fields: A list of fields on the
            model objects to update. If `None`, all fields will be updated.

    Example:
        Update an attribute of multiple models in bulk::

            import pgbulk

            pgbulk.update(
                MyModel,
                [
                    MyModel(id=1, some_attr='some_val1'),
                    MyModel(id=2, some_attr='some_val2')
                ],
                # These are the fields that will be updated. If not provided,
                # all fields will be updated
                ['some_attr']
            )
    """
    queryset = queryset if isinstance(queryset, models.QuerySet) else queryset.objects.all()
    connection = connections[queryset.db]
    model = queryset.model
    update_fields = _get_update_fields(queryset, update_fields)

    # Sort the model objects to reduce the likelihood of deadlocks
    model_objs = sorted(model_objs, key=lambda obj: obj.pk)

    # Add the pk to the value fields so we can join during the update
    value_fields = [model._meta.pk.attname] + update_fields

    row_values = [
        [
            _get_field_db_val(
                queryset,
                model_obj._meta.get_field(field),
                getattr(model_obj, model_obj._meta.get_field(field).attname),
                connection,
            )
            for field in value_fields
        ]
        for model_obj in model_objs
    ]

    # If we do not have any values or fields to update, just return
    if len(row_values) == 0 or len(update_fields) == 0:
        return []

    db_types = [model._meta.get_field(field).db_type(connection) for field in value_fields]

    value_fields_sql = ", ".join(
        '"{field}"'.format(field=model._meta.get_field(field).column) for field in value_fields
    )

    update_fields_sql = ", ".join(
        [
            '"{field}" = "new_values"."{field}"'.format(field=model._meta.get_field(field).column)
            for field in update_fields
        ]
    )

    values_sql = ", ".join(
        [
            "({0})".format(
                ", ".join(
                    [
                        "%s::{0}".format(db_types[i]) if not row_number and i else "%s"
                        for i, _ in enumerate(row)
                    ]
                )
            )
            for row_number, row in enumerate(row_values)
        ]
    )

    update_sql = (
        "UPDATE {table} "
        "SET {update_fields_sql} "
        "FROM (VALUES {values_sql}) AS new_values ({value_fields_sql}) "
        'WHERE "{table}"."{pk_field}" = "new_values"."{pk_field}"'
    ).format(
        table=model._meta.db_table,
        pk_field=model._meta.pk.column,
        update_fields_sql=update_fields_sql,
        values_sql=values_sql,
        value_fields_sql=value_fields_sql,
    )

    update_sql_params = list(itertools.chain(*row_values))

    with connection.cursor() as cursor:
        update_sql_params = _prep_sql_args(queryset, connection, cursor, update_sql_params)
        return cursor.execute(update_sql, update_sql_params)

pgbulk.upsert

upsert(
    queryset: Union[models.Model, models.QuerySet],
    model_objs: List[models.Model],
    unique_fields: List[str],
    update_fields: Union[List[str], None] = None,
    *,
    returning: Union[List[str], bool] = False,
    redundant_updates: bool = False
) -> UpsertResult

Perform a bulk upsert.

Parameters:

Name Type Description Default
queryset Union[Model, QuerySet]

A model or a queryset that defines the collection to upsert

required
model_objs List[Model]

A list of Django models to upsert. All models in this list will be bulk upserted.

required
unique_fields List[str]

A list of fields that define the uniqueness of the model. The model must have a unique constraint on these fields

required
update_fields Union[List[str], None]

A list of fields to update whenever objects already exist. If an empty list is provided, it is equivalent to doing a bulk insert on the objects that don't exist. If None, all fields will be updated. If you want to perform an expression such as an F object on a field when it is updated, use the pgbulk.UpdateField class. See examples below.

None
returning Union[List[str], bool]

If True, returns all fields. If a list, only returns fields in the list. If False, do not return results from the upsert.

False
redundant_updates bool

Perform an update even if all columns are identical to the row in the database.

False

Returns:

Type Description
UpsertResult

The upsert result, an iterable list of all upsert objects. Use the .updated and .created attributes to iterate over created or updated elements.

Example

A basic bulk upsert on a model:

import pgbulk

pgbulk.upsert(
    MyModel,
    [
        MyModel(int_field=1, some_attr="some_val1"),
        MyModel(int_field=2, some_attr="some_val2"),
    ],
    # These are the fields that identify the uniqueness constraint.
    ["int_field"],
    # These are the fields that will be updated if the row already
    # exists. If not provided, all fields will be updated
    ["some_attr"]
)
Example

Return the results of an upsert:

results = pgbulk.upsert(
    MyModel,
    [
        MyModel(int_field=1, some_attr="some_val1"),
        MyModel(int_field=2, some_attr="some_val2"),
    ],
    ["int_field"],
    ["some_attr"],
    # `True` will return all columns. One can also explicitly
    # list which columns will be returned
    returning=True
)

# Print which results were created
print(results.created)

# Print which results were updated.
# By default, if an update results in no changes, it will not
# be updated and will not be returned.
print(results.updated)
Example

Upsert values and update rows even when the update is meaningless (i.e. a redundant update). This is turned off by default, but it can be enabled in case postgres triggers or other processes need to happen as a result of an update:

pgbulk.upsert(
    MyModel,
    [
        MyModel(int_field=1, some_attr="some_val1"),
        MyModel(int_field=2, some_attr="some_val2"),
    ],
    ["int_field"],
    ["some_attr"],
    # Perform updates in the database even if it's identical
    redundant_updates=True
)
Example

Use an expression for a field if an update happens. In the example below, we increment some_int_field by one whenever an update happens. Otherwise it defaults to zero:

results = pgbulk.upsert(
    MyModel,
    [
        MyModel(some_int_field=0, some_key="a"),
        MyModel(some_int_field=0, some_key="b")
    ],
    ["some_key"],
    [
        # Use UpdateField to specify an expression for the update.
        pgbulk.UpdateField(
            "some_int_field",
            expression=models.F("some_int_field") + 1
        )
    ],
)
Source code in pgbulk/core.py
def upsert(
    queryset: Union[models.Model, models.QuerySet],
    model_objs: List[models.Model],
    unique_fields: List[str],
    update_fields: Union[List[str], None] = None,
    *,
    returning: Union[List[str], bool] = False,
    redundant_updates: bool = False,
) -> UpsertResult:
    """
    Perform a bulk upsert.

    Args:
        queryset: A model or a queryset that defines the
            collection to upsert
        model_objs: A list of Django models to upsert. All models
            in this list will be bulk upserted.
        unique_fields: A list of fields that define the uniqueness
            of the model. The model must have a unique constraint on these
            fields
        update_fields: A list of fields to update whenever objects already exist.
            If an empty list is provided, it is equivalent to doing a bulk insert on
            the objects that don't exist. If `None`, all fields will be updated.
            If you want to perform an expression such as an `F` object on a field when
            it is updated, use the [pgbulk.UpdateField][] class. See examples below.
        returning: If True, returns all fields. If a list, only returns fields
            in the list. If False, do not return results from the upsert.
        redundant_updates: Perform an update
            even if all columns are identical to the row in the database.

    Returns:
        The upsert result, an iterable list of all upsert objects. Use the `.updated`
            and `.created` attributes to iterate over created or updated elements.

    Example:
        A basic bulk upsert on a model:

            import pgbulk

            pgbulk.upsert(
                MyModel,
                [
                    MyModel(int_field=1, some_attr="some_val1"),
                    MyModel(int_field=2, some_attr="some_val2"),
                ],
                # These are the fields that identify the uniqueness constraint.
                ["int_field"],
                # These are the fields that will be updated if the row already
                # exists. If not provided, all fields will be updated
                ["some_attr"]
            )

    Example:
        Return the results of an upsert:

            results = pgbulk.upsert(
                MyModel,
                [
                    MyModel(int_field=1, some_attr="some_val1"),
                    MyModel(int_field=2, some_attr="some_val2"),
                ],
                ["int_field"],
                ["some_attr"],
                # `True` will return all columns. One can also explicitly
                # list which columns will be returned
                returning=True
            )

            # Print which results were created
            print(results.created)

            # Print which results were updated.
            # By default, if an update results in no changes, it will not
            # be updated and will not be returned.
            print(results.updated)

    Example:
        Upsert values and update rows even when the update is meaningless
        (i.e. a redundant update). This is turned off by default, but it
        can be enabled in case postgres triggers or other processes
        need to happen as a result of an update:

            pgbulk.upsert(
                MyModel,
                [
                    MyModel(int_field=1, some_attr="some_val1"),
                    MyModel(int_field=2, some_attr="some_val2"),
                ],
                ["int_field"],
                ["some_attr"],
                # Perform updates in the database even if it's identical
                redundant_updates=True
            )

    Example:
        Use an expression for a field if an update happens. In the example
        below, we increment `some_int_field` by one whenever an update happens.
        Otherwise it defaults to zero:

            results = pgbulk.upsert(
                MyModel,
                [
                    MyModel(some_int_field=0, some_key="a"),
                    MyModel(some_int_field=0, some_key="b")
                ],
                ["some_key"],
                [
                    # Use UpdateField to specify an expression for the update.
                    pgbulk.UpdateField(
                        "some_int_field",
                        expression=models.F("some_int_field") + 1
                    )
                ],
            )
    """
    return _upsert(
        queryset,
        model_objs,
        unique_fields,
        update_fields=update_fields,
        returning=returning,
        redundant_updates=redundant_updates,
    )