Skip to content

Reference

pgbulk

Bulk Postgres upsert and update functions:

pgbulk.UpdateField

Bases: str

For expressing an update field as an expression to an upsert operation.

Example:

results = pgbulk.upsert(
    MyModel,
    [
        MyModel(some_int_field=0, some_key="a"),
        MyModel(some_int_field=0, some_key="b")
    ],
    ["some_key"],
    [
        pgbulk.UpdateField(
            "some_int_field",
            expression=models.F('some_int_field') + 1
        )
    ],
)

pgbulk.UpsertResult

Bases: List['Row']

Returned by pgbulk.upsert when the returning argument is provided.

Wraps a list of named tuples where the names correspond to the underlying Django model attribute names.

Also provides properties to access created and updated rows.

created property

created: List[Row]

Return the created rows

updated property

updated: List[Row]

Return the updated rows

pgbulk.acopy async

acopy(
    queryset: QuerySet[_M],
    model_objs: Iterable[_M],
    copy_fields: UpdateFieldsTypeDef = None,
    *,
    exclude: Union[List[str], None] = None,
    binary: bool = False
) -> None

Asynchronously copy data into a table.

See pgbulk.copy

Note

Like other async Django ORM methods, acopy currently wraps copy in a sync_to_async wrapper. It does not yet use an asynchronous database driver but will in the future.

Source code in pgbulk/core.py
async def acopy(
    queryset: QuerySet[_M],
    model_objs: Iterable[_M],
    copy_fields: UpdateFieldsTypeDef = None,
    *,
    exclude: Union[List[str], None] = None,
    binary: bool = False,
) -> None:
    """
    Asynchronously copy data into a table.

    See [pgbulk.copy][]

    Note:
        Like other async Django ORM methods, `acopy` currently wraps `copy` in
        a `sync_to_async` wrapper. It does not yet use an asynchronous database
        driver but will in the future.
    """
    return await sync_to_async(copy)(
        queryset,
        model_objs,
        copy_fields=copy_fields,
        exclude=exclude,
        binary=binary,
    )

pgbulk.aupdate async

aupdate(
    queryset: QuerySet[_M],
    model_objs: Iterable[_M],
    update_fields: Union[List[str], None] = None,
    *,
    exclude: Union[List[str], None] = None,
    returning: Union[List[str], Literal[True]],
    ignore_unchanged: bool = False
) -> List[Row]
aupdate(
    queryset: QuerySet[_M],
    model_objs: Iterable[_M],
    update_fields: Union[List[str], None] = None,
    *,
    exclude: Union[List[str], None] = None,
    returning: Literal[False] = False,
    ignore_unchanged: bool = False
) -> None
aupdate(
    queryset: QuerySet[_M],
    model_objs: Iterable[_M],
    update_fields: Union[List[str], None] = None,
    *,
    exclude: Union[List[str], None] = None,
    returning: Union[List[str], bool] = False,
    ignore_unchanged: bool = False
) -> Union[List[Row], None]
aupdate(
    queryset: QuerySet[_M],
    model_objs: Iterable[_M],
    update_fields: Union[List[str], None] = None,
    *,
    exclude: Union[List[str], None] = None,
    returning: Union[List[str], bool] = False,
    ignore_unchanged: bool = False
) -> Union[List[Row], None]

Perform an asynchronous bulk update.

See pgbulk.update

Note

Like other async Django ORM methods, aupdate currently wraps update in a sync_to_async wrapper. It does not yet use an asynchronous database driver but will in the future.

Source code in pgbulk/core.py
async def aupdate(
    queryset: QuerySet[_M],
    model_objs: Iterable[_M],
    update_fields: Union[List[str], None] = None,
    *,
    exclude: Union[List[str], None] = None,
    returning: Union[List[str], bool] = False,
    ignore_unchanged: bool = False,
) -> Union[List["Row"], None]:
    """
    Perform an asynchronous bulk update.

    See [pgbulk.update][]

    Note:
        Like other async Django ORM methods, `aupdate` currently wraps `update` in
        a `sync_to_async` wrapper. It does not yet use an asynchronous database
        driver but will in the future.
    """
    return await sync_to_async(update)(
        queryset,
        model_objs,
        update_fields=update_fields,
        exclude=exclude,
        returning=returning,
        ignore_unchanged=ignore_unchanged,
    )

pgbulk.aupsert async

aupsert(
    queryset: QuerySet[_M],
    model_objs: Iterable[_M],
    unique_fields: List[str],
    update_fields: UpdateFieldsTypeDef = None,
    *,
    exclude: Union[List[str], None] = None,
    returning: Union[List[str], Literal[True]],
    ignore_unchanged: bool = False
) -> UpsertResult
aupsert(
    queryset: QuerySet[_M],
    model_objs: Iterable[_M],
    unique_fields: List[str],
    update_fields: UpdateFieldsTypeDef = None,
    *,
    exclude: Union[List[str], None] = None,
    returning: Literal[False] = False,
    ignore_unchanged: bool = False
) -> None
aupsert(
    queryset: QuerySet[_M],
    model_objs: Iterable[_M],
    unique_fields: List[str],
    update_fields: UpdateFieldsTypeDef = None,
    *,
    exclude: Union[List[str], None] = None,
    returning: Union[List[str], bool] = False,
    ignore_unchanged: bool = False
) -> Union[UpsertResult, None]
aupsert(
    queryset: QuerySet[_M],
    model_objs: Iterable[_M],
    unique_fields: List[str],
    update_fields: UpdateFieldsTypeDef = None,
    *,
    exclude: Union[List[str], None] = None,
    returning: Union[List[str], bool] = False,
    ignore_unchanged: bool = False
) -> Union[UpsertResult, None]

Perform an asynchronous bulk upsert.

See pgbulk.upsert

Note

Like other async Django ORM methods, aupsert currently wraps upsert in a sync_to_async wrapper. It does not yet use an asynchronous database driver but will in the future.

Source code in pgbulk/core.py
async def aupsert(
    queryset: QuerySet[_M],
    model_objs: Iterable[_M],
    unique_fields: List[str],
    update_fields: UpdateFieldsTypeDef = None,
    *,
    exclude: Union[List[str], None] = None,
    returning: Union[List[str], bool] = False,
    ignore_unchanged: bool = False,
) -> Union[UpsertResult, None]:
    """
    Perform an asynchronous bulk upsert.

    See [pgbulk.upsert][]

    Note:
        Like other async Django ORM methods, `aupsert` currently wraps `upsert` in
        a `sync_to_async` wrapper. It does not yet use an asynchronous database
        driver but will in the future.
    """
    return await sync_to_async(upsert)(
        queryset,
        model_objs,
        unique_fields=unique_fields,
        update_fields=update_fields,
        returning=returning,
        exclude=exclude,
        ignore_unchanged=ignore_unchanged,
    )

pgbulk.copy

copy(
    queryset: QuerySet[_M],
    model_objs: Iterable[_M],
    copy_fields: UpdateFieldsTypeDef = None,
    *,
    exclude: Union[List[str], None] = None,
    binary: bool = False
) -> None

Copy data into a table.

Parameters:

Name Type Description Default
queryset QuerySet[_M]

queryset: A model or a queryset for the table being copied into.

required
model_objs Iterable[_M]

An iterable of Django models to copy.

required
copy_fields UpdateFieldsTypeDef

A list of fields on the model objects to copy. If None, all fields will be copied.

None
exclude Union[List[str], None]

A list of fields to exclude from the copy. This is useful when copy_fields is None and you want to exclude fields from being copied.

None
binary bool

If True, copy data in binary format. This can yield improved performance for large datasets.

False
Note

Model signals such as post_save are not emitted.

Source code in pgbulk/core.py
def copy(
    queryset: QuerySet[_M],
    model_objs: Iterable[_M],
    copy_fields: UpdateFieldsTypeDef = None,
    *,
    exclude: Union[List[str], None] = None,
    binary: bool = False,
) -> None:
    """
    Copy data into a table.

    Args:
        queryset: queryset: A model or a queryset for the table being copied into.
        model_objs: An iterable of Django models to copy.
        copy_fields: A list of fields on the model objects to copy.
            If `None`, all fields will be copied.
        exclude: A list of fields to exclude from the copy. This is useful
            when `copy_fields` is `None` and you want to exclude fields from
            being copied.
        binary: If True, copy data in binary format.
            This can yield improved performance for large datasets.

    Note:
        Model signals such as `post_save` are not emitted.
    """
    if psycopg_maj_version == 2:  # pragma: no cover
        raise RuntimeError("Only psycopg3 is supported for pgbulk.copy.")

    queryset = queryset if isinstance(queryset, models.QuerySet) else queryset.objects.all()
    model = queryset.model

    # Populate automatically-generated fields in the rows like date times
    _fill_auto_fields(queryset, model_objs)

    # Determine which fields should be copied
    fields = [
        model._meta.get_field(field)
        for field in _filter_fields(
            queryset, copy_fields, exclude=exclude, exclude_non_updatable=False
        )
    ]
    cols = [field.column for field in fields]

    connection = connections[queryset.db]
    with connection.cursor() as cursor:
        all_field_names_sql = ", ".join([_quote(col, cursor) for col in cols])
        copy_sql = (
            f"COPY {_quote(model._meta.db_table, cursor)} ({all_field_names_sql}) FROM STDIN"
        )
        if binary:
            copy_sql += " WITH (FORMAT BINARY)"

        with cursor.copy(copy_sql) as copier:  # type: ignore
            if binary:
                postgres_types = _postgres_types_for_fields(fields, connection)  # type: ignore
                copier.set_types(postgres_types)  # type: ignore

            for model_obj in model_objs:
                row = _get_values_for_row(queryset, model_obj, fields, copying=True)
                copier.write_row(row)  # type: ignore

pgbulk.update

update(
    queryset: QuerySet[_M],
    model_objs: Iterable[_M],
    update_fields: Union[List[str], None] = None,
    *,
    exclude: Union[List[str], None] = None,
    returning: Union[List[str], Literal[True]],
    ignore_unchanged: bool = False
) -> List[Row]
update(
    queryset: QuerySet[_M],
    model_objs: Iterable[_M],
    update_fields: Union[List[str], None] = None,
    *,
    exclude: Union[List[str], None] = None,
    returning: Literal[False] = False,
    ignore_unchanged: bool = False
) -> None
update(
    queryset: QuerySet[_M],
    model_objs: Iterable[_M],
    update_fields: Union[List[str], None] = None,
    *,
    exclude: Union[List[str], None] = None,
    returning: Union[List[str], bool] = False,
    ignore_unchanged: bool = False
) -> Union[List[Row], None]
update(
    queryset: QuerySet[_M],
    model_objs: Iterable[_M],
    update_fields: Union[List[str], None] = None,
    *,
    exclude: Union[List[str], None] = None,
    returning: Union[List[str], bool] = False,
    ignore_unchanged: bool = False
) -> Union[List[Row], None]

Performs a bulk update.

Parameters:

Name Type Description Default
queryset QuerySet[_M]

A model or a queryset for the table being updated.

required
model_objs Iterable[_M]

Model object values to use for the update.

required
update_fields Union[List[str], None]

A list of fields on the model objects to update. If None, all fields will be updated.

None
exclude Union[List[str], None]

A list of fields to exclude from the update. This is useful when update_fields is None and you want to exclude fields from being updated.

None
returning Union[List[str], bool]

If True, returns all fields. If a list, only returns fields in the list. If False, do not return results from the upsert.

False
ignore_unchanged bool

Ignore unchanged rows in updates.

False
Note

Model signals such as post_save are not emitted.

Returns:

Type Description
Union[List[Row], None]

If returning=True, an iterable list of all updated objects.

Source code in pgbulk/core.py
def update(
    queryset: QuerySet[_M],
    model_objs: Iterable[_M],
    update_fields: Union[List[str], None] = None,
    *,
    exclude: Union[List[str], None] = None,
    returning: Union[List[str], bool] = False,
    ignore_unchanged: bool = False,
) -> Union[List["Row"], None]:
    """
    Performs a bulk update.

    Args:
        queryset: A model or a queryset for the table being updated.
        model_objs: Model object values to use for the update.
        update_fields: A list of fields on the model objects to update.
            If `None`, all fields will be updated.
        exclude: A list of fields to exclude from the update. This is useful
            when `update_fields` is `None` and you want to exclude fields from
            being updated.
        returning: If True, returns all fields. If a list, only returns fields
            in the list. If False, do not return results from the upsert.
        ignore_unchanged: Ignore unchanged rows in updates.

    Note:
        Model signals such as `post_save` are not emitted.

    Returns:
        If `returning=True`, an iterable list of all updated objects.
    """
    queryset = queryset if isinstance(queryset, models.QuerySet) else queryset.objects.all()
    with connections[queryset.db].cursor() as cursor:
        return _update(
            queryset=queryset,
            model_objs=model_objs,
            update_fields=update_fields,
            exclude=exclude,
            returning=returning,
            ignore_unchanged=ignore_unchanged,
            cursor=cursor,
        )

pgbulk.upsert

upsert(
    queryset: QuerySet[_M],
    model_objs: Iterable[_M],
    unique_fields: List[str],
    update_fields: UpdateFieldsTypeDef = None,
    *,
    exclude: Union[List[str], None] = None,
    returning: Union[List[str], Literal[True]],
    ignore_unchanged: bool = False
) -> UpsertResult
upsert(
    queryset: QuerySet[_M],
    model_objs: Iterable[_M],
    unique_fields: List[str],
    update_fields: UpdateFieldsTypeDef = None,
    *,
    exclude: Union[List[str], None] = None,
    returning: Literal[False] = False,
    ignore_unchanged: bool = False
) -> None
upsert(
    queryset: QuerySet[_M],
    model_objs: Iterable[_M],
    unique_fields: List[str],
    update_fields: UpdateFieldsTypeDef = None,
    *,
    exclude: Union[List[str], None] = None,
    returning: Union[List[str], bool] = False,
    ignore_unchanged: bool = False
) -> Union[UpsertResult, None]
upsert(
    queryset: QuerySet[_M],
    model_objs: Iterable[_M],
    unique_fields: List[str],
    update_fields: UpdateFieldsTypeDef = None,
    *,
    exclude: Union[List[str], None] = None,
    returning: Union[List[str], bool] = False,
    ignore_unchanged: bool = False
) -> Union[UpsertResult, None]

Perform a bulk upsert.

Parameters:

Name Type Description Default
queryset QuerySet[_M]

A model or a queryset for the table being upserted.

required
model_objs Iterable[_M]

An iterable of Django models to upsert. All models in this list will be bulk upserted.

required
unique_fields List[str]

A list of fields that define the uniqueness of the model. The model must have a unique constraint on these fields.

required
update_fields UpdateFieldsTypeDef

A list of fields to update whenever objects already exist. If an empty list is provided, it is equivalent to doing a bulk insert on the objects that don't exist. If None, all fields will be updated. If you want to perform an expression such as an F object on a field when it is updated, use the pgbulk.UpdateField class. See examples below.

None
exclude Union[List[str], None]

A list of fields to exclude from the upsert. This is useful when update_fields is None and you want to exclude fields from being updated. This is additive to the unique_fields list.

None
returning Union[List[str], bool]

If True, returns all fields. If a list, only returns fields in the list. If False, do not return results from the upsert.

False
ignore_unchanged bool

Ignore unchanged rows in updates.

False

Returns:

Type Description
Union[UpsertResult, None]

If returning=True, the upserted result, an iterable list of all upsert objects. Use the .updated and .created attributes to iterate over created or updated elements.

Note

Model signals such as post_save are not emitted.

Source code in pgbulk/core.py
def upsert(
    queryset: QuerySet[_M],
    model_objs: Iterable[_M],
    unique_fields: List[str],
    update_fields: UpdateFieldsTypeDef = None,
    *,
    exclude: Union[List[str], None] = None,
    returning: Union[List[str], bool] = False,
    ignore_unchanged: bool = False,
) -> Union[UpsertResult, None]:
    """
    Perform a bulk upsert.

    Args:
        queryset: A model or a queryset for the table being upserted.
        model_objs: An iterable of Django models to upsert. All models
            in this list will be bulk upserted.
        unique_fields: A list of fields that define the uniqueness
            of the model. The model must have a unique constraint on these
            fields.
        update_fields: A list of fields to update whenever objects already exist.
            If an empty list is provided, it is equivalent to doing a bulk insert on
            the objects that don't exist. If `None`, all fields will be updated.
            If you want to perform an expression such as an `F` object on a field when
            it is updated, use the [pgbulk.UpdateField][] class. See examples below.
        exclude: A list of fields to exclude from the upsert. This is useful
            when `update_fields` is `None` and you want to exclude fields from
            being updated. This is additive to the `unique_fields` list.
        returning: If True, returns all fields. If a list, only returns fields
            in the list. If False, do not return results from the upsert.
        ignore_unchanged: Ignore unchanged rows in updates.

    Returns:
        If `returning=True`, the upserted result, an iterable list of all upsert objects.
            Use the `.updated` and `.created` attributes to iterate over created or updated
            elements.

    Note:
        Model signals such as `post_save` are not emitted.
    """
    queryset = queryset if isinstance(queryset, models.QuerySet) else queryset.objects.all()
    with connections[queryset.db].cursor() as cursor:
        return _upsert(
            queryset,
            model_objs,
            unique_fields=unique_fields,
            update_fields=update_fields,
            returning=returning,
            exclude=exclude,
            ignore_unchanged=ignore_unchanged,
            cursor=cursor,
        )