Relationship Model
[models.py]
...
class User(UserMixin, Model):
...
def following(self):
"""Users that current user follows."""
return User.select().join( Relationship, on=Relationship.to_user
).where( Relationship.from_user == self )
def followers(self):
"""Users that follow the current user."""
return User.select().join( Relationship, on=Relationship.from_user
).where( Relationship.to_user == self )
@classmethod
def create_user(cls, username, email, password, admin=False):
try:
...
class Relationship(Model):
from_user = ForeignKeyField(User, related_name='relationships')
to_user = ForeignKeyField(User, related_name='related_to')
class Meta:
database = DATABASE
indexes = (
(('from_user','to_user'), True )
)
Relationship( Model )
It is a model that is constituted of 2 attributes/columns only, both are foreign keys to the User model. This makes each record in the Relationship model represent a relationship between 2 records in the User model.
from_user = ForeignKeyField( User, related_name=’relationships’ )
Relationships coming from other users (i.e. followers).
to_user = ForeignKeyField( User, related_name=’related_to’ )
Relationships going to other users (i.e. following).
indexes = ( ((‘from_user’,’to_user’), True) )
Indexes tells the database how to find that data. But it also specifies if an index is unique. This creates a unique index from from_user and to_user, where True indicates the uniqueness of this index.
.join( )
.select().join( Relationship, on=Relationship.to_user).where( Relationship.from_user == self )
Used to create a query that joins 2 models together (e.g. User & Relationship) on a certain attribute from the joined model.
Follow/Unfollow Views
[app.py]
...
@app.route('/follow/<username>')
@login_required
def follow():
try:
to_user = models.User.get(models.User.username**username)
except models.DoesNotExist:
pass
else:
try:
models.Relationship.create( from_user=g.user._get_current_object(), to_user=user )
except IntegrityError:
pass
else:
flash("You have successfully followed {}!".format(to_user.username), "success")
return redirect(url_for('stream', username=to_user.username))
@app.route('/unfollow/<username>')
@login_required
def unfollow():
try:
to_user = models.User.get(models.User.username**username)
except models.DoesNotExist:
pass
else:
try:
models.Relationship.get(from_user=g.user._get_current_object(), to_user=user).delete_instance()
except IntegrityError:
pass
else:
flash("You have unfollowed {}.".format(to_user.username), "success")
return redirect(url_for('stream', username=to_user.username))
@app.route('/')
def index():
...
models.Relationship.create( from_user=g.user._get_current_object( ) , to_user=user )
The moment a new relationship record is created from two User primary keys.
username=to_user.username
We are sending the username to the ‘stream’ view only for the sake of displaying it in the flash message.