How We Implement JWT Authentication

Doubletapp
13 min readFeb 5, 2024

Hello! My name is Danil, and I’m a backend developer at Doubletapp. Almost all of our projects involve users who can log into the system. This means that we almost always need authentication. We use JWT-based authentication, which combines the simplicity of implementation and security for applications.

There are many materials on the internet explaining what JWT is and how to use it. However, most examples limit themselves to issuing a token for the user. In this article, I want to tell not only about what JWT is but also how to implement working with access and refresh tokens and solve related issues. There will be a bit of theory and a lot of practice. Get comfortable, we’re about to start.

What is JSON Web Token?

If you are already familiar with JWT technology and don’t need the theory, you can skip this part and go straight to the implementation.

Definition

JWT (JSON Web Token) is an open standard for creating access tokens, based on the JSON format. It is commonly used for transferring data for user authentication in client-server applications. Tokens are created by the server, signed with a secret key, and passed to the user, who will then use them to confirm their identity.

In an application where the authorization is based on JWT, for a malicious actor to gain access to a user’s content, they would need to obtain the user’s access token.

Structure of the token

The token consists of three parts: header, payload и signature. The first two are JSON encoded using base64. The signature is the token’s signature.

Header: The header contains two fields: alg (signature algorithm)* and typ (token type). In decoded form, it looks like this:

{
"alg": "HS256",
"typ": "JWT"
}

* commonly used algorithms are HS256 or RS256, but other signature encryption algorithms are also allowed by the standard.

Encoded, it looks like this: eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9

Payload: The payload contains useful information about the user and the token. Some keys are reserved by the standard, but all of them are optional:

  • iss (issuer) — token issuer
  • sub (subject) — subject to whom the token is issued
  • aud (audience) — recipients for whom this token is intended
  • exp (expiration time) — the time when the token will become invalid
  • nbf (not before) — the time from which the token should be considered valid
  • iat (issued at) — the time at which the token was issued
  • jti (JWT ID) — unique identifier of the token

Besides these keys, you can invent and add any others you need.

Here’s how the payload of a token issued to befunny@doubletapp.ai might look:

{
"message": "something info",
"sub": "befunny@doubletapp.ai"
}

Or in base64: eyJtZXNzYWdlIjoiSGVsbG8sIEhhYnIhIiwic3ViIjoiYmVmdW5ueUBkb3VibGV0YXBwLmFpIn0=

Important! Anyone can decode the token (for example, on jwt.io). Therefore, never transmit compromising information in it: sensitive user data, passwords, etc.

Signature: The token signature is created as follows:

signature = HMAC_SHA256(secret, base64urlEncoding(header) + '.' + base64urlEncoding(payload))

Concatenate the base64-encoded header and payload using a period as a separator. Encode the resulting string using the chosen algorithm and a secret key.

The secret is a key for encrypting and verifying the signature. It is generated and stored on the server and is used to sign the token during generation. It is also needed to check the token upon receipt. It is important to ensure confidentiality, i.e., the inaccessibility of this key. The reason is that with its help, it becomes possible to create any tokens for your application that will be recognized as valid. The main requirement for the key is resistance to brute force. Usually, it is generated in hex format.

The final token is a string consisting of the three previously described parts joined by periods. According to the example above, our token will look like this:

eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJiZWZ1bm55QGRvdWJsZXRhcHAuYWkiLCJtZXNzYWdlIjoiSGVsbG8sIEhhYnIhIn0.FAMoE435ZafgdICuc6181RsEuR5V1J7dJkzhZRWQk1Y

Why it works

Why does this token format guarantee data integrity and prevent substitution? The trick is that to verify the authenticity of the token, it’s sufficient to take the header and payload from it, obtain the signature based on the algorithm mentioned above, and compare the calculated signature with the one actually present in the token.

If a malicious user decides to add extra information to their token or change the user to whom it was issued, the token will be deemed invalid due to the mismatch between the actual and calculated signatures, and the server will reject the request.

Usage and implementation

Next, I will provide examples of code in Python 3.10. PyJWT will be used for encoding and decoding JWT, and FastAPI will be used as the web framework.

Simple JWT implementation

The simplest scenario of using JWT tokens is as follows:

  • The user registers/logs into the system, and a token is issued to them.
  • This token is saved on the client side.
  • For each subsequent request, the client includes the token in the header: Authorization: Bearer <token>.
  • The server, upon receiving a request with such a header, checks its validity. If successful, it sends the requested content.

Let’s implement the scenario:

Generate a token.

token = jwt.encode(payload={'sub': user.id}, key=JWT_SECRET, algorithm='HS256')

Check its validity upon receiving a request. This can be placed in a separate middleware:

async def check_access_token(
request: Request,
authorization_header: str = Security(APIKeyHeader(name='Authorization', auto_error=False))
) -> str:
# Check if the token is passed
if authorization_header is None:
raise JsonHTTPException()

# Check the token for compliance
if 'Bearer ' not in authorization_header:
raise JsonHTTPException()

# Remove unnecessary from the token
clear_token = authorization_header.replace('Bearer ', '')

try:
# Check the validity of the token
payload = decode(jwt=clear_token, key=JWT_SECRET, algorithms=['HS256', 'RS256'])
except InvalidTokenError:
# В случае невалидности возвращаем ошибку
raise JsonHTTPException()

# Identify the user
user = await APIUser.filter(id=payload['sub']).first()
if not user:
raise JsonHTTPException()

request.state.user = user

return authorization_header

You might rightfully ask, “Why bother appending this Bearer to the token if it’s going to be discarded anyway?” — it’s just the way it has been done.

This is the simplest way to use JWT. It has both pros and cons.

Pros:

  • Simplicity.
  • Nothing needs to be stored on the server side.

Cons:

  • The token is issued once and for all. For example, if Mallory intercepts Bob’s token, she will gain eternal access to his data.
  • The only way to revoke Bob’s token is to change the secret. However, this will break the tokens of all other users.

Access and refresh tokens

Let’s address the issue that one token can be used indefinitely. To do this, let’s change the interaction scheme between the client and the server. Tokens will be of two types:

Access Token: Provides access to information, usually with a lifespan of a few minutes.

Refresh Token: A refresh token that can be used to obtain a new pair of tokens; its lifespan is measured in days.

Usage scenario:

  • The user registers and receives a pair of tokens: access and refresh.
  • For all their requests, they accompany the access token and receive a response “(as before, with a regular jwt token)”.
  • When the lifespan of the access token has expired or is about to expire, the user (or the client application) sends their refresh token to the server, which revokes it and returns a new pair.

What happens if the refresh token expires? The user will need to go through the authorization process to confirm their identity and obtain a new pair of tokens.

With this approach, the user will have access to the content without the constant need for new authentication. Also, the damage from a potential interception of the access token by a malicious actor will be relatively small due to the short lifespan of the token.

In other words, when Bob registers and receives his pair of tokens, he can use the application without constantly entering his username and password. However, if Mallory somehow intercepts his access token, her happiness will be short-lived — the token’s lifespan will soon expire, and she won’t have a refresh token to renew it. We’ll discuss what happens if a refresh token is intercepted a bit below.

Implementation

Previously, tokens were created using a simple scheme. Now it will be a bit more complicated. Let’s break down the token signing method:

def __sign_token(self,
type: str, subject: str,
payload: вict[str, Any]={},
ttl: timedelta=None
) -> str:
"""
Keyword arguments:
type -- token type(access/refresh);
subject -- subject for which the token is issued;
payload -- payload to add to the token;
ttl -- token lifespan
"""
# Take the current UNIX time
current_timestamp = convert_to_timestamp(datetime.now(tz=timezone.utc))

# Collect the token payload:
data = dict(
# Specify ourselves as the issuer
iss='befunny@auth_service',
sub=subject,
type=type,
# Generate a token identifier randomly ( UUID )
jti=self.__generate_jti(),
# Set the issuance time to the current time
iat=current_timestamp,
# Set the token's start time to the current time or what was passed in the payload
nbf=payload['nbf'] if payload.get('nbf') else current_timestamp
)
# Add exp - the time after which the token will become invalid, if ttl is passed
data.update(dict(exp=data['nbf'] + int(ttl.total_seconds()))) if ttl else None
# Update the initial payload with the resulting dictionary
payload.update(data)

return jwt.encode(payload=payload, key=JWT_SECRET, algorithm='HS256')

Return a pair of tokens to the user during registration:

@auth_api.post('/register', response_model=AuthOutput)
async def register(body: AuthInput):
...

user = await AuthenticatedUser.create(
login=body.login,
password_hash=hash_password(body.password),
)

access_token = jwt_auth.generate_access_token(subject=user.login)
refresh_token = jwt_auth.generate_refresh_token(subject=user.login)

return AuthOutput(access_token=access_token, refresh_token=refresh_token)

Let’s enhance our middleware with token type verification to prevent using a refresh token as an access token:

try:
payload = ...
if payload['type'] != TokenType.ACCESS.value:
raise JsonHTTPException()
except InvalidTokenError:
...

Add a route to refresh tokens:

async def update_tokens(self, user: APIUser, refresh_token: str) -> ...:
payload, error = try_decode_token(jwt_auth=self._jwt_auth, token=refresh_token)
if error:
return ...

if payload['type'] != TokenType.REFRESH.value:
return ...

access_token, refresh_token = self._issue_tokens_for_user(user, device_id)
return Tokens(access_token=access_token, refresh_token=refresh_token)

Voilà! We have significantly more code now and a bit more security.

However, the possibility for a malicious actor to gain unlimited access to others’ data hasn’t disappeared. Mallory just needs to intercept the refresh token now, not the access token. In this case, she will again have perpetual access to Bob’s content, just like with a regular JWT.

Revoking tokens

Let’s address the issue and learn how to revoke refresh tokens. If Bob notices suspicious activity, he can press a button to revoke the issued tokens.

For this, in the database, we need to create a table with token data: its identifier, owner, and status (revoked or not):

class IssuedJWTToken(Model):
jti = fields.CharField(max_length=36, pk=True)
subject = fields.ForeignKeyField(model_name='models.APIUser', on_delete='CASCADE', related_name='tokens')
revoked = fields.BooleanField(default=False)

The algorithm won’t change fundamentally. When refreshing, we’ll simply start revoking all previously issued refresh tokens.

Implementation

Save the jti of the new refresh token during registration and token refresh:

await IssuedToken.create(subject=user, jti=jwt_auth.get_jti(refresh_token))

When refreshing, revoke all previously issued refresh tokens for the user:

payload, _ = try_decode_token(jwt_auth, body.refresh_token)
await IssuedToken.filter(jti=payload['jti']).update(revoked=True)

This could be a stopping point, but let’s consider the following situation:

  • Bob has a stolen refresh_token_1.
  • Bob uses refresh_token_1 to get a new pair of tokens.
  • The service returns refresh_token_2 and access_token_2, revoking the previous ones.
  • Mallory also tries to use refresh_token_1 to get a new pair of tokens for herself and freely use the application on behalf of Bob.

Mallory will see an error on the screen. But what if Mallory refreshed the first token? At the application level, we can’t be sure who refreshed the tokens first and who is currently using the application — the user or the malicious actor. Therefore, we shouldn’t forget about security if we know that the token was stolen.

Auth0 suggests the following: if a refresh token is used again, immediately invalidate the entire family of these tokens.

The subsequent steps would be:

  • The service recognizes that refresh_token_1 is being used again and immediately invalidates the entire family of refresh tokens, including refresh_token_2.
  • The service sends Mallory a denial of access response.
  • The lifespan of access_token_2 expires, and Bob tries to use refresh_token_2 to get a new pair of tokens. The service denies him access. Reauthentication is needed.

Let’s implement this scheme by adding one condition to the logic of the token refresh route:

if await check_revoked(payload['jti']):
await IssuedJWTToken.filter(jti=payload['jti']).update(revoked=True)
return None, AccessError.get_token_already_revoked_error()

The logic of this handler will be almost identical to any token revocation we’ve done before.

Now we have the ability to revoke a refresh token in case of suspicious activity.

Why do we only revoke the refresh token? Revoking the access token is not always necessary since its lifespan is measured in minutes. However, for greater security, let’s add the ability to revoke access tokens as well.

Save data about the access token in IssuedJWTToken similar to the refresh token.

In places where we revoke tokens, change filter(jti=jti) to filter(subject=user) so that it looks like this:

await IssuedToken.filter(subject=user).update(revoked=True)

Let’s add a check in our middleware to see if the token has been revoked:

if await check_revoked(payload['jti']):
raise JsonHTTPException(content=dict(AccessError.get_token_revoked_error()), status_code=403)

This way, we’ve addressed all the shortcomings mentioned earlier. Tokens now live no longer than strictly allocated time, and in extreme cases, we have the ability to revoke both access and refresh tokens.

Mallory intercepted the access token and started sending inappropriate messages on behalf of Bob? Not a problem; in the worst case, she will use it for a few minutes, after which she will lose access. If Bob notices suspicious activity quickly, he can revoke all issued tokens even earlier.

Mallory intercepted the refresh token? Also not a problem. At one moment, Mallory and Bob will try to refresh using the same refresh token and will be redirected to the application login screen. Bob logs in again, and Mallory is left with nothing.

However, while solving these problems, we forgot that a user may have multiple devices.

Access from multiple devices

All the above works fine, not considering that a user may have multiple devices. The application won’t be able to distinguish Mallory from Bob, who decided to visit our site from his phone. In the current version, authorization on one device will break tokens on another, which is not very user-friendly.

Let’s start additionally storing a unique device identifier — device_id.

Update our database table:

class IssuedJWTToken(Model): 
jti = ...
subject = ...
device_id = fields.CharField(max_length=36)
revoked = ...

We’ll add this identifier to the token payload. When updating tokens, we’ll refine the filtering for revocation:

device_id = payload['device_id'] 
await IssuedJWTToken.filter(subject=user, device_id=device_id).update(revoked=True)

However, attempting to refresh a revoked token should still break everything. For convenience in the middleware code, along with saving the user, we’ll save the device_id:

request.state.device_id = payload['device_id']

Now we’ve added the ability to log out from a specific device. To do this, it will be enough to revoke all tokens issued to a specific device, as we did during refresh.

A valid question might arise: “Where do we get this device_id?” We can generate a UUID. This is the simplest option. But in case of need, the identifier could be, for example, the IDFA on iOS or the Advertising ID on Android.

Deleting old data

With each new feature, we have been loading more and more onto our database. Now, let’s remember that we don’t need to store records for tokens that have already expired. What to do with this?

Store the expiration time in IssuedJWTToken:

class IssuedJWTToken(Model): 
subject = ...
jti = ...
device_id = ...
revoked = ...
expired_time = fields.IntField() # don't forget that UNIX time.

Periodically, delete all records whose expiration time has come. This can be done with any scheduler and the following lines:

current_timestamp = convert_to_timestamp(datetime.now(tz=timezone.utc)) 
await IssuedJWTToken.filter(expired_time__lt=current_timestamp).delete()

GitHub with the implementation of all the above: github.com/doubletapp/habr-jwt-auth-example.

Useful links:

  • RFC7519 — JSON Web Token standard
  • auth0.com — documentation on how Auth0 uses JWT for user identification

Conclusion

So, we have discussed what JWT tokens are and how to work with them. We wrote a simple implementation using a single eternal token and enhanced it to be able to revoke tokens, i.e., terminate user sessions.

JWT is a convenient tool for user authentication checks. The main difference from other tokens is that it is very easy to decrypt and see what’s inside. At the same time, it cannot be forged without knowing the key thanks to the signature. However, it should be noted that anyone can decode this data. Therefore, in JWT, you cannot store anything compromising: users’ personal data, passwords, and the like.

--

--