server/tags: move tag categories to database

This commit is contained in:
rr- 2016-04-18 21:41:47 +02:00
parent 9350c4ff97
commit e3a4793d54
20 changed files with 166 additions and 123 deletions

View File

@ -37,13 +37,6 @@ limits:
max_comment_length: 5000 max_comment_length: 5000
tag_name_regex: ^:?[a-zA-Z0-9_-]+$ tag_name_regex: ^:?[a-zA-Z0-9_-]+$
tag_categories:
- plain
- meta
- artist
- character
- copyright
- other unique
# changing ranks after deployment may require manual tweaks to the database. # changing ranks after deployment may require manual tweaks to the database.
ranks: ranks:

View File

@ -11,7 +11,7 @@ MAIL_BODY = \
class PasswordResetApi(BaseApi): class PasswordResetApi(BaseApi):
def get(self, ctx, user_name): def get(self, ctx, user_name):
''' Send a mail with secure token to the correlated user. ''' ''' Send a mail with secure token to the correlated user. '''
user = users.get_by_name_or_email(ctx.session, user_name) user = users.get_user_by_name_or_email(ctx.session, user_name)
if not user: if not user:
raise errors.NotFoundError('User %r not found.' % user_name) raise errors.NotFoundError('User %r not found.' % user_name)
if not user.email: if not user.email:
@ -29,7 +29,7 @@ class PasswordResetApi(BaseApi):
def post(self, ctx, user_name): def post(self, ctx, user_name):
''' Verify token from mail, generate a new password and return it. ''' ''' Verify token from mail, generate a new password and return it. '''
user = users.get_by_name_or_email(ctx.session, user_name) user = users.get_user_by_name_or_email(ctx.session, user_name)
if not user: if not user:
raise errors.NotFoundError('User %r not found.' % user_name) raise errors.NotFoundError('User %r not found.' % user_name)
good_token = auth.generate_authentication_token(user) good_token = auth.generate_authentication_token(user)

View File

@ -6,7 +6,7 @@ from szurubooru.api.base_api import BaseApi
def _serialize_tag(tag): def _serialize_tag(tag):
return { return {
'names': [tag_name.name for tag_name in tag.names], 'names': [tag_name.name for tag_name in tag.names],
'category': tag.category, 'category': tag.category.name,
'suggestions': [ 'suggestions': [
relation.names[0].name for relation in tag.suggestions], relation.names[0].name for relation in tag.suggestions],
'implications': [ 'implications': [
@ -47,21 +47,22 @@ class TagListApi(BaseApi):
tag = tags.create_tag( tag = tags.create_tag(
ctx.session, names, category, suggestions, implications) ctx.session, names, category, suggestions, implications)
ctx.session.add(tag) ctx.session.add(tag)
ctx.session.flush()
snapshots.create(ctx.session, tag, ctx.user)
ctx.session.commit() ctx.session.commit()
tags.export_to_json(ctx.session) tags.export_to_json(ctx.session)
snapshots.create(ctx.session, tag, ctx.user)
return {'tag': _serialize_tag(tag)} return {'tag': _serialize_tag(tag)}
class TagDetailApi(BaseApi): class TagDetailApi(BaseApi):
def get(self, ctx, tag_name): def get(self, ctx, tag_name):
auth.verify_privilege(ctx.user, 'tags:view') auth.verify_privilege(ctx.user, 'tags:view')
tag = tags.get_by_name(ctx.session, tag_name) tag = tags.get_tag_by_name(ctx.session, tag_name)
if not tag: if not tag:
raise tags.TagNotFoundError('Tag %r not found.' % tag_name) raise tags.TagNotFoundError('Tag %r not found.' % tag_name)
return {'tag': _serialize_tag(tag)} return {'tag': _serialize_tag(tag)}
def put(self, ctx, tag_name): def put(self, ctx, tag_name):
tag = tags.get_by_name(ctx.session, tag_name) tag = tags.get_tag_by_name(ctx.session, tag_name)
if not tag: if not tag:
raise tags.TagNotFoundError('Tag %r not found.' % tag_name) raise tags.TagNotFoundError('Tag %r not found.' % tag_name)
@ -72,7 +73,8 @@ class TagDetailApi(BaseApi):
if ctx.has_param('category'): if ctx.has_param('category'):
auth.verify_privilege(ctx.user, 'tags:edit:category') auth.verify_privilege(ctx.user, 'tags:edit:category')
tags.update_category(tag, ctx.get_param_as_string('category')) tags.update_category_name(
ctx.session, tag, ctx.get_param_as_string('category'))
if ctx.has_param('suggestions'): if ctx.has_param('suggestions'):
auth.verify_privilege(ctx.user, 'tags:edit:suggestions') auth.verify_privilege(ctx.user, 'tags:edit:suggestions')
@ -85,13 +87,13 @@ class TagDetailApi(BaseApi):
ctx.session, tag, ctx.get_param_as_list('implications')) ctx.session, tag, ctx.get_param_as_list('implications'))
tag.last_edit_time = datetime.datetime.now() tag.last_edit_time = datetime.datetime.now()
snapshots.modify(ctx.session, tag, ctx.user)
ctx.session.commit() ctx.session.commit()
tags.export_to_json(ctx.session) tags.export_to_json(ctx.session)
snapshots.modify(ctx.session, tag, ctx.user)
return {'tag': _serialize_tag(tag)} return {'tag': _serialize_tag(tag)}
def delete(self, ctx, tag_name): def delete(self, ctx, tag_name):
tag = tags.get_by_name(ctx.session, tag_name) tag = tags.get_tag_by_name(ctx.session, tag_name)
if not tag: if not tag:
raise tags.TagNotFoundError('Tag %r not found.' % tag_name) raise tags.TagNotFoundError('Tag %r not found.' % tag_name)
if tag.post_count > 0: if tag.post_count > 0:
@ -100,8 +102,8 @@ class TagDetailApi(BaseApi):
'Please untag relevant posts first.') 'Please untag relevant posts first.')
auth.verify_privilege(ctx.user, 'tags:delete') auth.verify_privilege(ctx.user, 'tags:delete')
snapshots.delete(ctx.session, tag, ctx.user)
ctx.session.delete(tag) ctx.session.delete(tag)
ctx.session.commit() ctx.session.commit()
snapshots.delete(ctx.session, tag, ctx.user)
tags.export_to_json(ctx.session) tags.export_to_json(ctx.session)
return {} return {}

View File

@ -75,13 +75,13 @@ class UserListApi(BaseApi):
class UserDetailApi(BaseApi): class UserDetailApi(BaseApi):
def get(self, ctx, user_name): def get(self, ctx, user_name):
auth.verify_privilege(ctx.user, 'users:view') auth.verify_privilege(ctx.user, 'users:view')
user = users.get_by_name(ctx.session, user_name) user = users.get_user_by_name(ctx.session, user_name)
if not user: if not user:
raise users.UserNotFoundError('User %r not found.' % user_name) raise users.UserNotFoundError('User %r not found.' % user_name)
return {'user': _serialize_user(ctx.user, user)} return {'user': _serialize_user(ctx.user, user)}
def put(self, ctx, user_name): def put(self, ctx, user_name):
user = users.get_by_name(ctx.session, user_name) user = users.get_user_by_name(ctx.session, user_name)
if not user: if not user:
raise users.UserNotFoundError('User %r not found.' % user_name) raise users.UserNotFoundError('User %r not found.' % user_name)
@ -119,7 +119,7 @@ class UserDetailApi(BaseApi):
return {'user': _serialize_user(ctx.user, user)} return {'user': _serialize_user(ctx.user, user)}
def delete(self, ctx, user_name): def delete(self, ctx, user_name):
user = users.get_by_name(ctx.session, user_name) user = users.get_user_by_name(ctx.session, user_name)
if not user: if not user:
raise users.UserNotFoundError('User %r not found.' % user_name) raise users.UserNotFoundError('User %r not found.' % user_name)

View File

@ -53,8 +53,5 @@ def validate_config(src):
raise errors.ConfigError( raise errors.ConfigError(
'Database is not configured: %r is missing' % key) 'Database is not configured: %r is missing' % key)
if not len(src['tag_categories']):
raise errors.ConfigError('Must have at least one tag category')
config = read_config() # pylint: disable=invalid-name config = read_config() # pylint: disable=invalid-name
validate_config(config) validate_config(config)

View File

@ -1,5 +1,6 @@
from szurubooru.db.base import Base from szurubooru.db.base import Base
from szurubooru.db.user import User from szurubooru.db.user import User
from szurubooru.db.tag import Tag, TagName, TagSuggestion, TagImplication from szurubooru.db.tag import Tag, TagName, TagSuggestion, TagImplication
from szurubooru.db.tag_category import TagCategory
from szurubooru.db.post import Post, PostTag, PostRelation from szurubooru.db.post import Post, PostTag, PostRelation
from szurubooru.db.snapshot import Snapshot from szurubooru.db.snapshot import Snapshot

View File

@ -7,8 +7,10 @@ from szurubooru.db.post import PostTag
class TagSuggestion(Base): class TagSuggestion(Base):
__tablename__ = 'tag_suggestion' __tablename__ = 'tag_suggestion'
parent_id = Column('parent_id', Integer, ForeignKey('tag.id'), primary_key=True) parent_id = Column(
child_id = Column('child_id', Integer, ForeignKey('tag.id'), primary_key=True) 'parent_id', Integer, ForeignKey('tag.id'), primary_key=True)
child_id = Column(
'child_id', Integer, ForeignKey('tag.id'), primary_key=True)
def __init__(self, parent_id, child_id): def __init__(self, parent_id, child_id):
self.parent_id = parent_id self.parent_id = parent_id
@ -17,8 +19,10 @@ class TagSuggestion(Base):
class TagImplication(Base): class TagImplication(Base):
__tablename__ = 'tag_implication' __tablename__ = 'tag_implication'
parent_id = Column('parent_id', Integer, ForeignKey('tag.id'), primary_key=True) parent_id = Column(
child_id = Column('child_id', Integer, ForeignKey('tag.id'), primary_key=True) 'parent_id', Integer, ForeignKey('tag.id'), primary_key=True)
child_id = Column(
'child_id', Integer, ForeignKey('tag.id'), primary_key=True)
def __init__(self, parent_id, child_id): def __init__(self, parent_id, child_id):
self.parent_id = parent_id self.parent_id = parent_id
@ -28,7 +32,7 @@ class TagName(Base):
__tablename__ = 'tag_name' __tablename__ = 'tag_name'
tag_name_id = Column('tag_name_id', Integer, primary_key=True) tag_name_id = Column('tag_name_id', Integer, primary_key=True)
tag_id = Column('tag_id', Integer, ForeignKey('tag.id')) tag_id = Column('tag_id', Integer, ForeignKey('tag.id'), nullable=False)
name = Column('name', String(64), nullable=False, unique=True) name = Column('name', String(64), nullable=False, unique=True)
def __init__(self, name): def __init__(self, name):
@ -38,10 +42,12 @@ class Tag(Base):
__tablename__ = 'tag' __tablename__ = 'tag'
tag_id = Column('id', Integer, primary_key=True) tag_id = Column('id', Integer, primary_key=True)
category = Column('category', String(32), nullable=False) category_id = Column(
'category_id', Integer, ForeignKey('tag_category.id'), nullable=False)
creation_time = Column('creation_time', DateTime, nullable=False) creation_time = Column('creation_time', DateTime, nullable=False)
last_edit_time = Column('last_edit_time', DateTime) last_edit_time = Column('last_edit_time', DateTime)
category = relationship('TagCategory')
names = relationship('TagName', cascade='all, delete-orphan') names = relationship('TagName', cascade='all, delete-orphan')
suggestions = relationship( suggestions = relationship(
'Tag', 'Tag',

View File

@ -0,0 +1,12 @@
from sqlalchemy import Column, Integer, String
from szurubooru.db.base import Base
class TagCategory(Base):
__tablename__ = 'tag_category'
tag_category_id = Column('id', Integer, primary_key=True)
name = Column('name', String(32), nullable=False)
color = Column('color', String(32), nullable=False, default='#000000')
def __init__(self, name):
self.name = name

View File

@ -40,7 +40,7 @@ class Authenticator(object):
def _authenticate(self, session, username, password): def _authenticate(self, session, username, password):
''' Try to authenticate user. Throw AuthError for invalid users. ''' ''' Try to authenticate user. Throw AuthError for invalid users. '''
user = users.get_by_name(session, username) user = users.get_user_by_name(session, username)
if not user: if not user:
raise errors.AuthError('No such user.') raise errors.AuthError('No such user.')
if not auth.is_valid_password(user, password): if not auth.is_valid_password(user, password):

View File

@ -14,18 +14,26 @@ branch_labels = None
depends_on = None depends_on = None
def upgrade(): def upgrade():
op.create_table(
'tag_category',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('name', sa.String(length=32), nullable=False),
sa.Column('color', sa.String(length=32), nullable=False),
sa.PrimaryKeyConstraint('id'))
op.create_table( op.create_table(
'tag', 'tag',
sa.Column('id', sa.Integer(), nullable=False), sa.Column('id', sa.Integer(), nullable=False),
sa.Column('category', sa.String(length=32), nullable=False), sa.Column('category_id', sa.Integer(), nullable=False),
sa.Column('creation_time', sa.DateTime(), nullable=False), sa.Column('creation_time', sa.DateTime(), nullable=False),
sa.Column('last_edit_time', sa.DateTime(), nullable=True), sa.Column('last_edit_time', sa.DateTime(), nullable=True),
sa.ForeignKeyConstraint(['category_id'], ['tag_category.id']),
sa.PrimaryKeyConstraint('id')) sa.PrimaryKeyConstraint('id'))
op.create_table( op.create_table(
'tag_name', 'tag_name',
sa.Column('tag_name_id', sa.Integer(), nullable=False), sa.Column('tag_name_id', sa.Integer(), nullable=False),
sa.Column('tag_id', sa.Integer(), nullable=True), sa.Column('tag_id', sa.Integer(), nullable=False),
sa.Column('name', sa.String(length=64), nullable=False), sa.Column('name', sa.String(length=64), nullable=False),
sa.ForeignKeyConstraint(['tag_id'], ['tag.id']), sa.ForeignKeyConstraint(['tag_id'], ['tag.id']),
sa.PrimaryKeyConstraint('tag_name_id'), sa.PrimaryKeyConstraint('tag_name_id'),
@ -49,6 +57,7 @@ def upgrade():
def downgrade(): def downgrade():
op.drop_table('tag_suggestion') op.drop_table('tag_suggestion')
op.drop_table('tag_name')
op.drop_table('tag_implication') op.drop_table('tag_implication')
op.drop_table('tag_name')
op.drop_table('tag') op.drop_table('tag')
op.drop_table('tag_category')

View File

@ -24,11 +24,14 @@ def test_ctx(
tag_factory): tag_factory):
config_injector({ config_injector({
'data_dir': str(tmpdir), 'data_dir': str(tmpdir),
'tag_categories': ['meta', 'character', 'copyright'],
'tag_name_regex': '^[^!]*$', 'tag_name_regex': '^[^!]*$',
'ranks': ['anonymous', 'regular_user'], 'ranks': ['anonymous', 'regular_user'],
'privileges': {'tags:create': 'regular_user'}, 'privileges': {'tags:create': 'regular_user'},
}) })
session.add_all([
db.TagCategory(name) for name in [
'meta', 'character', 'copyright']])
session.flush()
ret = misc.dotdict() ret = misc.dotdict()
ret.session = session ret.session = session
ret.context_factory = context_factory ret.context_factory = context_factory
@ -60,7 +63,7 @@ def test_creating_simple_tags(test_ctx, fake_datetime):
} }
tag = get_tag(test_ctx.session, 'tag1') tag = get_tag(test_ctx.session, 'tag1')
assert [tag_name.name for tag_name in tag.names] == ['tag1', 'tag2'] assert [tag_name.name for tag_name in tag.names] == ['tag1', 'tag2']
assert tag.category == 'meta' assert tag.category.name == 'meta'
assert tag.last_edit_time is None assert tag.last_edit_time is None
assert tag.post_count == 0 assert tag.post_count == 0
assert_relations(tag.suggestions, []) assert_relations(tag.suggestions, [])
@ -111,8 +114,8 @@ def test_trying_to_create_tag_with_invalid_name(test_ctx, names):
def test_trying_to_use_existing_name(test_ctx): def test_trying_to_use_existing_name(test_ctx):
test_ctx.session.add_all([ test_ctx.session.add_all([
test_ctx.tag_factory(names=['used1'], category='meta'), test_ctx.tag_factory(names=['used1'], category_name='meta'),
test_ctx.tag_factory(names=['used2'], category='meta'), test_ctx.tag_factory(names=['used2'], category_name='meta'),
]) ])
test_ctx.session.commit() test_ctx.session.commit()
with pytest.raises(tags.TagAlreadyExistsError): with pytest.raises(tags.TagAlreadyExistsError):
@ -195,8 +198,8 @@ def test_creating_new_suggestions_and_implications(
def test_reusing_suggestions_and_implications(test_ctx): def test_reusing_suggestions_and_implications(test_ctx):
test_ctx.session.add_all([ test_ctx.session.add_all([
test_ctx.tag_factory(names=['tag1', 'tag2'], category='meta'), test_ctx.tag_factory(names=['tag1', 'tag2'], category_name='meta'),
test_ctx.tag_factory(names=['tag3'], category='meta'), test_ctx.tag_factory(names=['tag3'], category_name='meta'),
]) ])
test_ctx.session.commit() test_ctx.session.commit()
result = test_ctx.api.post( result = test_ctx.api.post(

View File

@ -27,7 +27,7 @@ def test_ctx(
ret.api = api.TagDetailApi() ret.api = api.TagDetailApi()
return ret return ret
def test_removing_tags(test_ctx): def test_deleting_tags(test_ctx):
test_ctx.session.add(test_ctx.tag_factory(names=['tag'])) test_ctx.session.add(test_ctx.tag_factory(names=['tag']))
test_ctx.session.commit() test_ctx.session.commit()
result = test_ctx.api.delete( result = test_ctx.api.delete(
@ -38,7 +38,7 @@ def test_removing_tags(test_ctx):
assert test_ctx.session.query(db.Tag).count() == 0 assert test_ctx.session.query(db.Tag).count() == 0
assert os.path.exists(os.path.join(config.config['data_dir'], 'tags.json')) assert os.path.exists(os.path.join(config.config['data_dir'], 'tags.json'))
def test_removing_tags_without_privileges(test_ctx): def test_deleting_tags_without_privileges(test_ctx):
test_ctx.session.add(test_ctx.tag_factory(names=['tag'])) test_ctx.session.add(test_ctx.tag_factory(names=['tag']))
test_ctx.session.commit() test_ctx.session.commit()
with pytest.raises(errors.AuthError): with pytest.raises(errors.AuthError):
@ -48,7 +48,7 @@ def test_removing_tags_without_privileges(test_ctx):
'tag') 'tag')
assert test_ctx.session.query(db.Tag).count() == 1 assert test_ctx.session.query(db.Tag).count() == 1
def test_removing_tags_with_usages(test_ctx, post_factory): def test_deleting_tags_with_usages(test_ctx, post_factory):
tag = test_ctx.tag_factory(names=['tag']) tag = test_ctx.tag_factory(names=['tag'])
post = post_factory() post = post_factory()
post.tags.append(tag) post.tags.append(tag)
@ -61,7 +61,7 @@ def test_removing_tags_with_usages(test_ctx, post_factory):
'tag') 'tag')
assert test_ctx.session.query(db.Tag).count() == 1 assert test_ctx.session.query(db.Tag).count() == 1
def test_removing_non_existing(test_ctx): def test_deleting_non_existing(test_ctx):
with pytest.raises(tags.TagNotFoundError): with pytest.raises(tags.TagNotFoundError):
test_ctx.api.delete( test_ctx.api.delete(
test_ctx.context_factory( test_ctx.context_factory(

View File

@ -24,7 +24,6 @@ def test_ctx(
tag_factory): tag_factory):
config_injector({ config_injector({
'data_dir': str(tmpdir), 'data_dir': str(tmpdir),
'tag_categories': ['meta', 'character', 'copyright'],
'tag_name_regex': '^[^!]*$', 'tag_name_regex': '^[^!]*$',
'ranks': ['anonymous', 'regular_user'], 'ranks': ['anonymous', 'regular_user'],
'privileges': { 'privileges': {
@ -34,6 +33,10 @@ def test_ctx(
'tags:edit:implications': 'regular_user', 'tags:edit:implications': 'regular_user',
}, },
}) })
session.add_all([
db.TagCategory(name) for name in [
'meta', 'character', 'copyright']])
session.flush()
ret = misc.dotdict() ret = misc.dotdict()
ret.session = session ret.session = session
ret.context_factory = context_factory ret.context_factory = context_factory
@ -43,7 +46,7 @@ def test_ctx(
return ret return ret
def test_simple_updating(test_ctx, fake_datetime): def test_simple_updating(test_ctx, fake_datetime):
tag = test_ctx.tag_factory(names=['tag1', 'tag2'], category='meta') tag = test_ctx.tag_factory(names=['tag1', 'tag2'], category_name='meta')
test_ctx.session.add(tag) test_ctx.session.add(tag)
test_ctx.session.commit() test_ctx.session.commit()
with fake_datetime('1997-12-01'): with fake_datetime('1997-12-01'):
@ -70,7 +73,7 @@ def test_simple_updating(test_ctx, fake_datetime):
tag = get_tag(test_ctx.session, 'tag3') tag = get_tag(test_ctx.session, 'tag3')
assert tag is not None assert tag is not None
assert [tag_name.name for tag_name in tag.names] == ['tag3'] assert [tag_name.name for tag_name in tag.names] == ['tag3']
assert tag.category == 'character' assert tag.category.name == 'character'
assert tag.suggestions == [] assert tag.suggestions == []
assert tag.implications == [] assert tag.implications == []
assert os.path.exists(os.path.join(config.config['data_dir'], 'tags.json')) assert os.path.exists(os.path.join(config.config['data_dir'], 'tags.json'))
@ -86,7 +89,7 @@ def test_trying_to_update_non_existing_tag(test_ctx):
@pytest.mark.parametrize('dup_name', ['tag1', 'TAG1']) @pytest.mark.parametrize('dup_name', ['tag1', 'TAG1'])
def test_reusing_own_name(test_ctx, dup_name): def test_reusing_own_name(test_ctx, dup_name):
test_ctx.session.add( test_ctx.session.add(
test_ctx.tag_factory(names=['tag1', 'tag2'], category='meta')) test_ctx.tag_factory(names=['tag1', 'tag2'], category_name='meta'))
test_ctx.session.commit() test_ctx.session.commit()
result = test_ctx.api.put( result = test_ctx.api.put(
test_ctx.context_factory( test_ctx.context_factory(
@ -102,7 +105,7 @@ def test_reusing_own_name(test_ctx, dup_name):
def test_duplicating_names(test_ctx): def test_duplicating_names(test_ctx):
test_ctx.session.add( test_ctx.session.add(
test_ctx.tag_factory(names=['tag1', 'tag2'], category='meta')) test_ctx.tag_factory(names=['tag1', 'tag2'], category_name='meta'))
result = test_ctx.api.put( result = test_ctx.api.put(
test_ctx.context_factory( test_ctx.context_factory(
input={'names': ['tag3', 'TAG3']}, input={'names': ['tag3', 'TAG3']},
@ -121,7 +124,8 @@ def test_duplicating_names(test_ctx):
{'names': ['x' * 65]}, {'names': ['x' * 65]},
]) ])
def test_trying_to_set_invalid_name(test_ctx, input): def test_trying_to_set_invalid_name(test_ctx, input):
test_ctx.session.add(test_ctx.tag_factory(names=['tag1'], category='meta')) test_ctx.session.add(
test_ctx.tag_factory(names=['tag1'], category_name='meta'))
test_ctx.session.commit() test_ctx.session.commit()
with pytest.raises(tags.InvalidNameError): with pytest.raises(tags.InvalidNameError):
test_ctx.api.put( test_ctx.api.put(
@ -133,8 +137,8 @@ def test_trying_to_set_invalid_name(test_ctx, input):
@pytest.mark.parametrize('dup_name', ['tag1', 'TAG1', 'tag2', 'TAG2']) @pytest.mark.parametrize('dup_name', ['tag1', 'TAG1', 'tag2', 'TAG2'])
def test_trying_to_use_existing_name(test_ctx, dup_name): def test_trying_to_use_existing_name(test_ctx, dup_name):
test_ctx.session.add_all([ test_ctx.session.add_all([
test_ctx.tag_factory(names=['tag1', 'tag2'], category='meta'), test_ctx.tag_factory(names=['tag1', 'tag2'], category_name='meta'),
test_ctx.tag_factory(names=['tag3', 'tag4'], category='meta')]) test_ctx.tag_factory(names=['tag3', 'tag4'], category_name='meta')])
test_ctx.session.commit() test_ctx.session.commit()
with pytest.raises(tags.TagAlreadyExistsError): with pytest.raises(tags.TagAlreadyExistsError):
test_ctx.api.put( test_ctx.api.put(
@ -144,7 +148,8 @@ def test_trying_to_use_existing_name(test_ctx, dup_name):
'tag3') 'tag3')
def test_trying_to_update_tag_with_invalid_category(test_ctx): def test_trying_to_update_tag_with_invalid_category(test_ctx):
test_ctx.session.add(test_ctx.tag_factory(names=['tag1'], category='meta')) test_ctx.session.add(
test_ctx.tag_factory(names=['tag1'], category_name='meta'))
test_ctx.session.commit() test_ctx.session.commit()
with pytest.raises(tags.InvalidCategoryError): with pytest.raises(tags.InvalidCategoryError):
test_ctx.api.put( test_ctx.api.put(
@ -182,7 +187,8 @@ def test_trying_to_update_tag_with_invalid_category(test_ctx):
]) ])
def test_updating_new_suggestions_and_implications( def test_updating_new_suggestions_and_implications(
test_ctx, input, expected_suggestions, expected_implications): test_ctx, input, expected_suggestions, expected_implications):
test_ctx.session.add(test_ctx.tag_factory(names=['main'], category='meta')) test_ctx.session.add(
test_ctx.tag_factory(names=['main'], category_name='meta'))
test_ctx.session.commit() test_ctx.session.commit()
result = test_ctx.api.put( result = test_ctx.api.put(
test_ctx.context_factory( test_ctx.context_factory(
@ -198,9 +204,9 @@ def test_updating_new_suggestions_and_implications(
def test_reusing_suggestions_and_implications(test_ctx): def test_reusing_suggestions_and_implications(test_ctx):
test_ctx.session.add_all([ test_ctx.session.add_all([
test_ctx.tag_factory(names=['tag1', 'tag2'], category='meta'), test_ctx.tag_factory(names=['tag1', 'tag2'], category_name='meta'),
test_ctx.tag_factory(names=['tag3'], category='meta'), test_ctx.tag_factory(names=['tag3'], category_name='meta'),
test_ctx.tag_factory(names=['tag4'], category='meta'), test_ctx.tag_factory(names=['tag4'], category_name='meta'),
]) ])
test_ctx.session.commit() test_ctx.session.commit()
result = test_ctx.api.put( result = test_ctx.api.put(
@ -225,7 +231,8 @@ def test_reusing_suggestions_and_implications(test_ctx):
{'names': ['ok'], 'implications': ['ok2', '!']}, {'names': ['ok'], 'implications': ['ok2', '!']},
]) ])
def test_trying_to_update_tag_with_invalid_relation(test_ctx, input): def test_trying_to_update_tag_with_invalid_relation(test_ctx, input):
test_ctx.session.add(test_ctx.tag_factory(names=['tag'], category='meta')) test_ctx.session.add(
test_ctx.tag_factory(names=['tag'], category_name='meta'))
test_ctx.session.commit() test_ctx.session.commit()
with pytest.raises(tags.InvalidNameError): with pytest.raises(tags.InvalidNameError):
test_ctx.api.put( test_ctx.api.put(
@ -253,7 +260,8 @@ def test_trying_to_update_tag_with_invalid_relation(test_ctx, input):
} }
]) ])
def test_tag_trying_to_relate_to_itself(test_ctx, input): def test_tag_trying_to_relate_to_itself(test_ctx, input):
test_ctx.session.add(test_ctx.tag_factory(names=['tag1'], category='meta')) test_ctx.session.add(
test_ctx.tag_factory(names=['tag1'], category_name='meta'))
test_ctx.session.commit() test_ctx.session.commit()
with pytest.raises(tags.RelationError): with pytest.raises(tags.RelationError):
test_ctx.api.put( test_ctx.api.put(
@ -268,7 +276,8 @@ def test_tag_trying_to_relate_to_itself(test_ctx, input):
{'implications': ['whatever']}, {'implications': ['whatever']},
]) ])
def test_trying_to_update_tag_without_privileges(test_ctx, input): def test_trying_to_update_tag_without_privileges(test_ctx, input):
test_ctx.session.add(test_ctx.tag_factory(names=['tag'], category='meta')) test_ctx.session.add(
test_ctx.tag_factory(names=['tag'], category_name='meta'))
test_ctx.session.commit() test_ctx.session.commit()
with pytest.raises(errors.AuthError): with pytest.raises(errors.AuthError):
test_ctx.api.put( test_ctx.api.put(

View File

@ -19,7 +19,7 @@ def test_ctx(session, config_injector, context_factory, user_factory):
ret.api = api.UserDetailApi() ret.api = api.UserDetailApi()
return ret return ret
def test_removing_oneself(test_ctx): def test_deleting_oneself(test_ctx):
user1 = test_ctx.user_factory(name='u1', rank='regular_user') user1 = test_ctx.user_factory(name='u1', rank='regular_user')
user2 = test_ctx.user_factory(name='u2', rank='regular_user') user2 = test_ctx.user_factory(name='u2', rank='regular_user')
test_ctx.session.add_all([user1, user2]) test_ctx.session.add_all([user1, user2])
@ -30,7 +30,7 @@ def test_removing_oneself(test_ctx):
assert result == {} assert result == {}
assert [u.name for u in test_ctx.session.query(db.User).all()] == ['u2'] assert [u.name for u in test_ctx.session.query(db.User).all()] == ['u2']
def test_removing_someone_else(test_ctx): def test_deleting_someone_else(test_ctx):
user1 = test_ctx.user_factory(name='u1', rank='regular_user') user1 = test_ctx.user_factory(name='u1', rank='regular_user')
user2 = test_ctx.user_factory(name='u2', rank='regular_user') user2 = test_ctx.user_factory(name='u2', rank='regular_user')
mod_user = test_ctx.user_factory(rank='mod') mod_user = test_ctx.user_factory(rank='mod')
@ -40,7 +40,7 @@ def test_removing_someone_else(test_ctx):
test_ctx.api.delete(test_ctx.context_factory(user=mod_user), 'u2') test_ctx.api.delete(test_ctx.context_factory(user=mod_user), 'u2')
assert test_ctx.session.query(db.User).all() == [] assert test_ctx.session.query(db.User).all() == []
def test_removing_non_existing(test_ctx): def test_deleting_non_existing(test_ctx):
with pytest.raises(users.UserNotFoundError): with pytest.raises(users.UserNotFoundError):
test_ctx.api.delete( test_ctx.api.delete(
test_ctx.context_factory( test_ctx.context_factory(

View File

@ -66,8 +66,10 @@ def user_factory():
return factory return factory
@pytest.fixture @pytest.fixture
def tag_factory(): def tag_factory(session):
def factory(names=None, category='dummy'): def factory(names=None, category_name='dummy'):
category = db.TagCategory(category_name)
session.add(category)
tag = db.Tag() tag = db.Tag()
tag.names = [db.TagName(name) for name in (names or [get_unique_name()])] tag.names = [db.TagName(name) for name in (names or [get_unique_name()])]
tag.category = category tag.category = category

View File

@ -2,30 +2,29 @@ from datetime import datetime
from szurubooru import db from szurubooru import db
def test_saving_tag(session, tag_factory): def test_saving_tag(session, tag_factory):
suggested_tag1 = tag_factory(names=['suggested1']) sug1 = tag_factory(names=['sug1'])
suggested_tag2 = tag_factory(names=['suggested2']) sug2 = tag_factory(names=['sug2'])
implied_tag1 = tag_factory(names=['implied1']) imp1 = tag_factory(names=['imp1'])
implied_tag2 = tag_factory(names=['implied2']) imp2 = tag_factory(names=['imp2'])
tag = db.Tag() tag = db.Tag()
tag.names = [db.TagName('alias1'), db.TagName('alias2')] tag.names = [db.TagName('alias1'), db.TagName('alias2')]
tag.suggestions = [] tag.suggestions = []
tag.implications = [] tag.implications = []
tag.category = 'category' tag.category = db.TagCategory('category')
tag.creation_time = datetime(1997, 1, 1) tag.creation_time = datetime(1997, 1, 1)
tag.last_edit_time = datetime(1998, 1, 1) tag.last_edit_time = datetime(1998, 1, 1)
session.add_all([ session.add_all([tag, sug1, sug2, imp1, imp2])
tag, suggested_tag1, suggested_tag2, implied_tag1, implied_tag2])
session.commit() session.commit()
assert tag.tag_id is not None assert tag.tag_id is not None
assert suggested_tag1.tag_id is not None assert sug1.tag_id is not None
assert suggested_tag2.tag_id is not None assert sug2.tag_id is not None
assert implied_tag1.tag_id is not None assert imp1.tag_id is not None
assert implied_tag2.tag_id is not None assert imp2.tag_id is not None
tag.suggestions.append(suggested_tag1) tag.suggestions.append(sug1)
tag.suggestions.append(suggested_tag2) tag.suggestions.append(sug2)
tag.implications.append(implied_tag1) tag.implications.append(imp1)
tag.implications.append(implied_tag2) tag.implications.append(imp2)
session.commit() session.commit()
tag = session.query(db.Tag) \ tag = session.query(db.Tag) \
@ -33,40 +32,39 @@ def test_saving_tag(session, tag_factory):
.filter(db.TagName.name=='alias1') \ .filter(db.TagName.name=='alias1') \
.one() .one()
assert [tag_name.name for tag_name in tag.names] == ['alias1', 'alias2'] assert [tag_name.name for tag_name in tag.names] == ['alias1', 'alias2']
assert tag.category == 'category' assert tag.category.name == 'category'
assert tag.creation_time == datetime(1997, 1, 1) assert tag.creation_time == datetime(1997, 1, 1)
assert tag.last_edit_time == datetime(1998, 1, 1) assert tag.last_edit_time == datetime(1998, 1, 1)
assert [relation.names[0].name for relation in tag.suggestions] \ assert [relation.names[0].name for relation in tag.suggestions] \
== ['suggested1', 'suggested2'] == ['sug1', 'sug2']
assert [relation.names[0].name for relation in tag.implications] \ assert [relation.names[0].name for relation in tag.implications] \
== ['implied1', 'implied2'] == ['imp1', 'imp2']
def test_cascade_deletions(session, tag_factory): def test_cascade_deletions(session, tag_factory):
suggested_tag1 = tag_factory(names=['suggested1']) sug1 = tag_factory(names=['sug1'])
suggested_tag2 = tag_factory(names=['suggested2']) sug2 = tag_factory(names=['sug2'])
implied_tag1 = tag_factory(names=['implied1']) imp1 = tag_factory(names=['imp1'])
implied_tag2 = tag_factory(names=['implied2']) imp2 = tag_factory(names=['imp2'])
tag = db.Tag() tag = db.Tag()
tag.names = [db.TagName('alias1'), db.TagName('alias2')] tag.names = [db.TagName('alias1'), db.TagName('alias2')]
tag.suggestions = [] tag.suggestions = []
tag.implications = [] tag.implications = []
tag.category = 'category' tag.category = db.TagCategory('category')
tag.creation_time = datetime(1997, 1, 1) tag.creation_time = datetime(1997, 1, 1)
tag.last_edit_time = datetime(1998, 1, 1) tag.last_edit_time = datetime(1998, 1, 1)
tag.post_count = 1 tag.post_count = 1
session.add_all([ session.add_all([tag, sug1, sug2, imp1, imp2])
tag, suggested_tag1, suggested_tag2, implied_tag1, implied_tag2])
session.commit() session.commit()
assert tag.tag_id is not None assert tag.tag_id is not None
assert suggested_tag1.tag_id is not None assert sug1.tag_id is not None
assert suggested_tag2.tag_id is not None assert sug2.tag_id is not None
assert implied_tag1.tag_id is not None assert imp1.tag_id is not None
assert implied_tag2.tag_id is not None assert imp2.tag_id is not None
tag.suggestions.append(suggested_tag1) tag.suggestions.append(sug1)
tag.suggestions.append(suggested_tag2) tag.suggestions.append(sug2)
tag.implications.append(implied_tag1) tag.implications.append(imp1)
tag.implications.append(implied_tag2) tag.implications.append(imp2)
session.commit() session.commit()
session.delete(tag) session.delete(tag)

View File

@ -4,13 +4,13 @@ from szurubooru import db
from szurubooru.util import snapshots from szurubooru.util import snapshots
def test_serializing_tag(session, tag_factory): def test_serializing_tag(session, tag_factory):
tag = tag_factory(names=['main_name', 'alias'], category='dummy') tag = tag_factory(names=['main_name', 'alias'], category_name='dummy')
assert snapshots.get_tag_snapshot(tag) == { assert snapshots.get_tag_snapshot(tag) == {
'names': ['main_name', 'alias'], 'names': ['main_name', 'alias'],
'category': 'dummy' 'category': 'dummy'
} }
tag = tag_factory(names=['main_name', 'alias'], category='dummy') tag = tag_factory(names=['main_name', 'alias'], category_name='dummy')
imp1 = tag_factory(names=['imp1_main_name', 'imp1_alias']) imp1 = tag_factory(names=['imp1_main_name', 'imp1_alias'])
imp2 = tag_factory(names=['imp2_main_name', 'imp2_alias']) imp2 = tag_factory(names=['imp2_main_name', 'imp2_alias'])
sug1 = tag_factory(names=['sug1_main_name', 'sug1_alias']) sug1 = tag_factory(names=['sug1_main_name', 'sug1_alias'])
@ -27,7 +27,7 @@ def test_serializing_tag(session, tag_factory):
} }
def test_merging_modification_to_creation(session, tag_factory, user_factory): def test_merging_modification_to_creation(session, tag_factory, user_factory):
tag = tag_factory(names=['dummy'], category='dummy') tag = tag_factory(names=['dummy'])
user = user_factory() user = user_factory()
session.add_all([tag, user]) session.add_all([tag, user])
session.flush() session.flush()
@ -43,7 +43,7 @@ def test_merging_modification_to_creation(session, tag_factory, user_factory):
def test_merging_modifications( def test_merging_modifications(
fake_datetime, session, tag_factory, user_factory): fake_datetime, session, tag_factory, user_factory):
tag = tag_factory(names=['dummy'], category='dummy') tag = tag_factory(names=['dummy'])
user = user_factory() user = user_factory()
session.add_all([tag, user]) session.add_all([tag, user])
session.flush() session.flush()
@ -67,7 +67,7 @@ def test_merging_modifications(
def test_not_adding_snapshot_if_data_doesnt_change( def test_not_adding_snapshot_if_data_doesnt_change(
fake_datetime, session, tag_factory, user_factory): fake_datetime, session, tag_factory, user_factory):
tag = tag_factory(names=['dummy'], category='dummy') tag = tag_factory(names=['dummy'])
user = user_factory() user = user_factory()
session.add_all([tag, user]) session.add_all([tag, user])
session.flush() session.flush()
@ -84,7 +84,7 @@ def test_not_adding_snapshot_if_data_doesnt_change(
def test_not_merging_due_to_time_difference( def test_not_merging_due_to_time_difference(
fake_datetime, session, tag_factory, user_factory): fake_datetime, session, tag_factory, user_factory):
tag = tag_factory(names=['dummy'], category='dummy') tag = tag_factory(names=['dummy'])
user = user_factory() user = user_factory()
session.add_all([tag, user]) session.add_all([tag, user])
session.flush() session.flush()
@ -99,7 +99,7 @@ def test_not_merging_due_to_time_difference(
def test_not_merging_operations_by_different_users( def test_not_merging_operations_by_different_users(
fake_datetime, session, tag_factory, user_factory): fake_datetime, session, tag_factory, user_factory):
tag = tag_factory(names=['dummy'], category='dummy') tag = tag_factory(names=['dummy'])
user1, user2 = [user_factory(), user_factory()] user1, user2 = [user_factory(), user_factory()]
session.add_all([tag, user1, user2]) session.add_all([tag, user1, user2])
session.flush() session.flush()
@ -113,7 +113,7 @@ def test_not_merging_operations_by_different_users(
def test_merging_resets_merging_time_window( def test_merging_resets_merging_time_window(
fake_datetime, session, tag_factory, user_factory): fake_datetime, session, tag_factory, user_factory):
tag = tag_factory(names=['dummy'], category='dummy') tag = tag_factory(names=['dummy'])
user = user_factory() user = user_factory()
session.add_all([tag, user]) session.add_all([tag, user])
session.flush() session.flush()
@ -136,7 +136,7 @@ def test_merging_resets_merging_time_window(
'initial_operation', [snapshots.create, snapshots.modify]) 'initial_operation', [snapshots.create, snapshots.modify])
def test_merging_deletion_to_modification_or_creation( def test_merging_deletion_to_modification_or_creation(
fake_datetime, session, tag_factory, user_factory, initial_operation): fake_datetime, session, tag_factory, user_factory, initial_operation):
tag = tag_factory(names=['dummy'], category='dummy') tag = tag_factory(names=['dummy'], category_name='dummy')
user = user_factory() user = user_factory()
session.add_all([tag, user]) session.add_all([tag, user])
session.flush() session.flush()
@ -162,7 +162,7 @@ def test_merging_deletion_to_modification_or_creation(
'expected_operation', [snapshots.create, snapshots.modify]) 'expected_operation', [snapshots.create, snapshots.modify])
def test_merging_deletion_all_the_way_deletes_all_snapshots( def test_merging_deletion_all_the_way_deletes_all_snapshots(
fake_datetime, session, tag_factory, user_factory, expected_operation): fake_datetime, session, tag_factory, user_factory, expected_operation):
tag = tag_factory(names=['dummy'], category='dummy') tag = tag_factory(names=['dummy'])
user = user_factory() user = user_factory()
session.add_all([tag, user]) session.add_all([tag, user])
session.flush() session.flush()

View File

@ -5,7 +5,7 @@ from szurubooru import db
def get_tag_snapshot(tag): def get_tag_snapshot(tag):
ret = { ret = {
'names': [tag_name.name for tag_name in tag.names], 'names': [tag_name.name for tag_name in tag.names],
'category': tag.category 'category': tag.category.name
} }
if tag.suggestions: if tag.suggestions:
ret['suggestions'] = sorted(rel.first_name for rel in tag.suggestions) ret['suggestions'] = sorted(rel.first_name for rel in tag.suggestions)

View File

@ -45,13 +45,19 @@ def export_to_json(session):
with open(export_path, 'w') as handle: with open(export_path, 'w') as handle:
handle.write(json.dumps(output, separators=(',', ':'))) handle.write(json.dumps(output, separators=(',', ':')))
def get_by_name(session, name): def get_tag_by_name(session, name):
return session.query(db.Tag) \ return session.query(db.Tag) \
.join(db.TagName) \ .join(db.TagName) \
.filter(db.TagName.name.ilike(name)) \ .filter(db.TagName.name.ilike(name)) \
.first() .first()
def get_by_names(session, names): def get_default_category(session):
return session.query(db.TagCategory) \
.order_by(db.TagCategory.tag_category_id.asc()) \
.limit(1) \
.one()
def get_tags_by_names(session, names):
names = misc.icase_unique(names) names = misc.icase_unique(names)
if len(names) == 0: if len(names) == 0:
return [] return []
@ -60,11 +66,11 @@ def get_by_names(session, names):
expr = expr | db.TagName.name.ilike(name) expr = expr | db.TagName.name.ilike(name)
return session.query(db.Tag).join(db.TagName).filter(expr).all() return session.query(db.Tag).join(db.TagName).filter(expr).all()
def get_or_create_by_names(session, names): def get_or_create_tags_by_names(session, names):
names = misc.icase_unique(names) names = misc.icase_unique(names)
for name in names: for name in names:
_verify_name_validity(name) _verify_name_validity(name)
related_tags = get_by_names(session, names) related_tags = get_tags_by_names(session, names)
new_tags = [] new_tags = []
for name in names: for name in names:
found = False found = False
@ -76,27 +82,32 @@ def get_or_create_by_names(session, names):
new_tag = create_tag( new_tag = create_tag(
session, session,
names=[name], names=[name],
category=config.config['tag_categories'][0], category_name=get_default_category(session).name,
suggestions=[], suggestions=[],
implications=[]) implications=[])
session.add(new_tag) session.add(new_tag)
new_tags.append(new_tag) new_tags.append(new_tag)
return related_tags, new_tags return related_tags, new_tags
def create_tag(session, names, category, suggestions, implications): def create_tag(session, names, category_name, suggestions, implications):
tag = db.Tag() tag = db.Tag()
tag.creation_time = datetime.datetime.now() tag.creation_time = datetime.datetime.now()
update_names(session, tag, names) update_names(session, tag, names)
update_category(tag, category) update_category_name(session, tag, category_name)
update_suggestions(session, tag, suggestions) update_suggestions(session, tag, suggestions)
update_implications(session, tag, implications) update_implications(session, tag, implications)
return tag return tag
def update_category(tag, category): def update_category_name(session, tag, category_name):
if not category in config.config['tag_categories']: category = session.query(db.TagCategory) \
.filter(db.TagCategory.name == category_name) \
.first()
if not category:
category_names = [
name[0] for name in session.query(db.TagCategory.name).all()]
raise InvalidCategoryError( raise InvalidCategoryError(
'Category %r is invalid. Valid categories: %r.' % ( 'Category %r is invalid. Valid categories: %r.' % (
category, config.config['tag_categories'])) category_name, category_names))
tag.category = category tag.category = category
def update_names(session, tag, names): def update_names(session, tag, names):
@ -129,13 +140,13 @@ def update_names(session, tag, names):
def update_implications(session, tag, relations): def update_implications(session, tag, relations):
if _check_name_intersection(_get_plain_names(tag), relations): if _check_name_intersection(_get_plain_names(tag), relations):
raise RelationError('Tag cannot imply itself.') raise RelationError('Tag cannot imply itself.')
related_tags, new_tags = get_or_create_by_names(session, relations) related_tags, new_tags = get_or_create_tags_by_names(session, relations)
session.flush() session.flush()
tag.implications = related_tags + new_tags tag.implications = related_tags + new_tags
def update_suggestions(session, tag, relations): def update_suggestions(session, tag, relations):
if _check_name_intersection(_get_plain_names(tag), relations): if _check_name_intersection(_get_plain_names(tag), relations):
raise RelationError('Tag cannot suggest itself.') raise RelationError('Tag cannot suggest itself.')
related_tags, new_tags = get_or_create_by_names(session, relations) related_tags, new_tags = get_or_create_tags_by_names(session, relations)
session.flush() session.flush()
tag.suggestions = related_tags + new_tags tag.suggestions = related_tags + new_tags

View File

@ -12,12 +12,12 @@ class InvalidPasswordError(errors.ValidationError): pass
class InvalidRankError(errors.ValidationError): pass class InvalidRankError(errors.ValidationError): pass
class InvalidAvatarError(errors.ValidationError): pass class InvalidAvatarError(errors.ValidationError): pass
def get_by_name(session, name): def get_user_by_name(session, name):
return session.query(db.User) \ return session.query(db.User) \
.filter(func.lower(db.User.name) == func.lower(name)) \ .filter(func.lower(db.User.name) == func.lower(name)) \
.first() .first()
def get_by_name_or_email(session, name_or_email): def get_user_by_name_or_email(session, name_or_email):
return session.query(db.User) \ return session.query(db.User) \
.filter( .filter(
(func.lower(db.User.name) == func.lower(name_or_email)) (func.lower(db.User.name) == func.lower(name_or_email))
@ -40,7 +40,7 @@ def create_user(session, name, password, email, auth_user):
def update_name(session, user, name, auth_user): def update_name(session, user, name, auth_user):
if misc.value_exceeds_column_size(name, db.User.name): if misc.value_exceeds_column_size(name, db.User.name):
raise InvalidNameError('User name is too long.') raise InvalidNameError('User name is too long.')
other_user = get_by_name(session, name) other_user = get_user_by_name(session, name)
if other_user and other_user.user_id != auth_user.user_id: if other_user and other_user.user_id != auth_user.user_id:
raise UserAlreadyExistsError('User %r already exists.' % name) raise UserAlreadyExistsError('User %r already exists.' % name)
name = name.strip() name = name.strip()