User Guide
pgbulk
Bulk Postgres upsert and update functions.
Briefly, these are the core functions and objects:
- Use pgbulk.upsert to do a native Postgres
INSERT ON CONFLICT
statement. - Use pgbulk.update to do a native Postgres bulk
UPDATE
statement. - Use pgbulk.aupsert or pgbulk.aupdate for async versions of these functions.
pgbulk.upsert has other objects related to advanced usage:
- pgbulk.UpsertResult encapsulates created and updated values when using the
returning
flag of pgbulk.upsert. - pgbulk.UpdateField allows one to specify expressions for updating fields in the upsert, for example, incrementing fields or conditionally ignoring updates.
pgbulk.UpdateField
Bases: UserString
For expressing an update field as an expression to an upsert operation.
Example:
results = pgbulk.upsert(
MyModel,
[
MyModel(some_int_field=0, some_key="a"),
MyModel(some_int_field=0, some_key="b")
],
["some_key"],
[
pgbulk.UpdateField(
"some_int_field",
expression=models.F('some_int_field') + 1
)
],
)
Source code in pgbulk/core.py
pgbulk.UpsertResult
Bases: list
Returned by pgbulk.upsert when the returning
argument is provided.
Wraps a list of named tuples where the names correspond to the underlying Django model attribute names.
Also provides properties to access created and updated rows.
pgbulk.aupdate
async
aupdate(
queryset: Union[Type[Model], QuerySet],
model_objs: Iterable[Model],
update_fields: Union[List[str], None] = None,
exclude: Union[List[str], None] = None,
) -> None
Perform an asynchronous bulk update.
See pgbulk.update
Note
Like other async Django ORM methods, aupdate
currently wraps update
in
a sync_to_async
wrapper. It does not yet use an asynchronous database
driver but will in the future.
Source code in pgbulk/core.py
pgbulk.aupsert
async
aupsert(
queryset: Union[Type[Model], QuerySet],
model_objs: Iterable[Model],
unique_fields: List[str],
update_fields: UpdateFieldsTypeDef = None,
*,
returning: Union[List[str], bool] = False,
exclude: Union[List[str], None] = None,
redundant_updates: bool = False
) -> UpsertResult
Perform an asynchronous bulk upsert.
See pgbulk.upsert
Note
Like other async Django ORM methods, aupsert
currently wraps upsert
in
a sync_to_async
wrapper. It does not yet use an asynchronous database
driver but will in the future.
Source code in pgbulk/core.py
pgbulk.update
update(
queryset: Union[Type[Model], QuerySet],
model_objs: Iterable[Model],
update_fields: Union[List[str], None] = None,
exclude: Union[List[str], None] = None,
) -> None
Performs a bulk update.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
queryset |
Union[Type[Model], QuerySet]
|
The queryset to use when bulk updating |
required |
model_objs |
Iterable[Model]
|
Model object values to use for the update |
required |
update_fields |
Union[List[str], None]
|
A list of fields on the
model objects to update. If |
None
|
exclude |
Union[List[str], None]
|
A list of fields to exclude from the update. This is useful
when |
None
|
Note
Model signals such as post_save
are not emitted.
Example
Update an attribute of multiple models in bulk::
import pgbulk
pgbulk.update(
MyModel,
[
MyModel(id=1, some_attr='some_val1'),
MyModel(id=2, some_attr='some_val2')
],
# These are the fields that will be updated. If not provided,
# all fields will be updated
['some_attr']
)
Source code in pgbulk/core.py
419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 |
|
pgbulk.upsert
upsert(
queryset: Union[Type[Model], QuerySet],
model_objs: Iterable[Model],
unique_fields: List[str],
update_fields: UpdateFieldsTypeDef = None,
*,
exclude: Union[List[str], None] = None,
returning: Union[List[str], bool] = False,
redundant_updates: bool = False
) -> UpsertResult
Perform a bulk upsert.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
queryset |
Union[Type[Model], QuerySet]
|
A model or a queryset that defines the collection to upsert |
required |
model_objs |
Iterable[Model]
|
An iterable of Django models to upsert. All models in this list will be bulk upserted. |
required |
unique_fields |
List[str]
|
A list of fields that define the uniqueness of the model. The model must have a unique constraint on these fields |
required |
update_fields |
UpdateFieldsTypeDef
|
A list of fields to update whenever objects already exist.
If an empty list is provided, it is equivalent to doing a bulk insert on
the objects that don't exist. If |
None
|
exclude |
Union[List[str], None]
|
A list of fields to exclude from the upsert. This is useful
when |
None
|
returning |
Union[List[str], bool]
|
If True, returns all fields. If a list, only returns fields in the list. If False, do not return results from the upsert. |
False
|
redundant_updates |
bool
|
Perform an update even if all columns are identical to the row in the database. |
False
|
Returns:
Type | Description |
---|---|
UpsertResult
|
The upsert result, an iterable list of all upsert objects. Use the |
Note
Model signals such as post_save
are not emitted.
Example
A basic bulk upsert on a model:
import pgbulk
pgbulk.upsert(
MyModel,
[
MyModel(int_field=1, some_attr="some_val1"),
MyModel(int_field=2, some_attr="some_val2"),
],
# These are the fields that identify the uniqueness constraint.
["int_field"],
# These are the fields that will be updated if the row already
# exists. If not provided, all fields will be updated
["some_attr"]
)
Example
Return the results of an upsert:
results = pgbulk.upsert(
MyModel,
[
MyModel(int_field=1, some_attr="some_val1"),
MyModel(int_field=2, some_attr="some_val2"),
],
["int_field"],
["some_attr"],
# `True` will return all columns. One can also explicitly
# list which columns will be returned
returning=True
)
# Print which results were created
print(results.created)
# Print which results were updated.
# By default, if an update results in no changes, it will not
# be updated and will not be returned.
print(results.updated)
Example
Upsert values and update rows even when the update is meaningless (i.e. a redundant update). This is turned off by default, but it can be enabled in case postgres triggers or other processes need to happen as a result of an update:
pgbulk.upsert(
MyModel,
[
MyModel(int_field=1, some_attr="some_val1"),
MyModel(int_field=2, some_attr="some_val2"),
],
["int_field"],
["some_attr"],
# Perform updates in the database even if it's identical
redundant_updates=True
)
Example
Use an expression for a field if an update happens. In the example
below, we increment some_int_field
by one whenever an update happens.
Otherwise it defaults to zero:
results = pgbulk.upsert(
MyModel,
[
MyModel(some_int_field=0, some_key="a"),
MyModel(some_int_field=0, some_key="b")
],
["some_key"],
[
# Use UpdateField to specify an expression for the update.
pgbulk.UpdateField(
"some_int_field",
expression=models.F("some_int_field") + 1
)
],
)
Source code in pgbulk/core.py
550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 |
|