gallery.accords-library.com/server/szurubooru/func/snapshots.py

129 lines
4.0 KiB
Python
Raw Normal View History

2016-08-14 18:06:49 +00:00
from datetime import datetime
from szurubooru import db
2016-08-14 18:06:49 +00:00
from szurubooru.func import diff, users
def get_tag_category_snapshot(category):
assert category
return {
'name': category.name,
'color': category.color,
'default': True if category.default else False,
}
def get_tag_snapshot(tag):
2016-08-14 18:06:49 +00:00
assert tag
2016-04-22 18:58:04 +00:00
return {
'names': [tag_name.name for tag_name in tag.names],
2016-04-20 09:15:36 +00:00
'category': tag.category.name,
'suggestions': sorted(rel.first_name for rel in tag.suggestions),
'implications': sorted(rel.first_name for rel in tag.implications),
}
2016-04-22 18:58:04 +00:00
2016-04-22 18:58:04 +00:00
def get_post_snapshot(post):
2016-08-14 18:06:49 +00:00
assert post
2016-04-22 18:58:04 +00:00
return {
'source': post.source,
'safety': post.safety,
'checksum': post.checksum,
2016-08-14 18:06:49 +00:00
'flags': post.flags,
'featured': post.is_featured,
2016-04-22 18:58:04 +00:00
'tags': sorted([tag.first_name for tag in post.tags]),
2016-08-14 18:06:49 +00:00
'relations': sorted([rel.post_id for rel in post.relations]),
2016-04-22 18:58:04 +00:00
'notes': sorted([{
2016-08-14 18:06:49 +00:00
'polygon': [[point[0], point[1]] for point in note.polygon],
2016-04-22 18:58:04 +00:00
'text': note.text,
2016-05-28 09:22:25 +00:00
} for note in post.notes], key=lambda x: x['polygon']),
2016-04-22 18:58:04 +00:00
}
2016-08-14 18:06:49 +00:00
_snapshot_factories = {
# lambdas allow mocking target functions in the tests
# pylint: disable=unnecessary-lambda
'tag_category': lambda entity: get_tag_category_snapshot(entity),
'tag': lambda entity: get_tag_snapshot(entity),
'post': lambda entity: get_post_snapshot(entity),
}
2016-08-14 18:06:49 +00:00
def serialize_snapshot(snapshot, auth_user):
2016-08-14 08:45:00 +00:00
assert snapshot
return {
'operation': snapshot.operation,
'type': snapshot.resource_type,
2016-08-14 18:06:49 +00:00
'id': snapshot.resource_name,
'user': users.serialize_micro_user(snapshot.user, auth_user),
'data': snapshot.data,
'time': snapshot.creation_time,
}
2016-08-14 18:06:49 +00:00
def _create(operation, entity, auth_user):
resource_type, resource_pkey, resource_name = (
db.util.get_resource_info(entity))
snapshot = db.Snapshot()
2016-08-14 18:06:49 +00:00
snapshot.creation_time = datetime.utcnow()
snapshot.operation = operation
snapshot.resource_type = resource_type
2016-08-14 18:06:49 +00:00
snapshot.resource_pkey = resource_pkey
snapshot.resource_name = resource_name
snapshot.user = auth_user
2016-08-14 18:06:49 +00:00
return snapshot
2016-08-14 18:06:49 +00:00
def create(entity, auth_user):
assert entity
snapshot = _create(db.Snapshot.OPERATION_CREATED, entity, auth_user)
snapshot_factory = _snapshot_factories[snapshot.resource_type]
snapshot.data = snapshot_factory(entity)
db.session.add(snapshot)
2016-08-14 18:06:49 +00:00
# pylint: disable=protected-access
def modify(entity, auth_user):
2016-08-14 08:45:00 +00:00
assert entity
2016-08-14 18:06:49 +00:00
model = next((model
for model in db.Base._decl_class_registry.values()
if hasattr(model, '__table__')
and model.__table__.fullname == entity.__table__.fullname),
None)
assert model
2016-08-14 18:06:49 +00:00
snapshot = _create(db.Snapshot.OPERATION_MODIFIED, entity, auth_user)
snapshot_factory = _snapshot_factories[snapshot.resource_type]
detached_session = db.sessionmaker()
detached_entity = detached_session.query(model).get(snapshot.resource_pkey)
assert detached_entity, 'Entity not found in DB, have you committed it?'
detached_snapshot = snapshot_factory(detached_entity)
detached_session.close()
active_snapshot = snapshot_factory(entity)
snapshot.data = diff.get_dict_diff(detached_snapshot, active_snapshot)
if not snapshot.data:
return
db.session.add(snapshot)
2016-08-14 18:06:49 +00:00
def delete(entity, auth_user):
2016-08-14 08:45:00 +00:00
assert entity
2016-08-14 18:06:49 +00:00
snapshot = _create(db.Snapshot.OPERATION_DELETED, entity, auth_user)
snapshot_factory = _snapshot_factories[snapshot.resource_type]
snapshot.data = snapshot_factory(entity)
db.session.add(snapshot)
def merge(source_entity, target_entity, auth_user):
assert source_entity
assert target_entity
snapshot = _create(db.Snapshot.OPERATION_MERGED, source_entity, auth_user)
resource_type, _resource_pkey, resource_name = (
db.util.get_resource_info(target_entity))
snapshot.data = [resource_type, resource_name]
db.session.add(snapshot)