Afroza Nowshin

Graduate Research Assistant at The University of Toledo

How to Do Periodic Tasks in a Django REST API Using Celery and Redis? - A Beginner-Friendly Guide


October 25, 2023

Today, I want to share how to solve an interesting problem I recently encountered. I was a noob to this and had to come across some concepts I had never encountered before. Here is the problem statement:

Create an API where you take your friend's location, destination, and travel date. Compare the temperature of those two locations at 2 PM on that day and return a response deciding whether they should travel there. Hint: You might need to periodically fetch data and store it somewhere.

For fetching data periodically, we are going to use Celery, and for storing the data, we are going to use Redis.

Here is the diagram of the whole task flow:

celery task

The breakdown of the task is:

  • Build a background system for periodical fetching of the weather data (Steps 2 - 7)

  • Create an API for the recommendation task (Step 8)

  • Extract the places from the API and match them with the stored data while parallel updating the data (Step 8,9)

Celery helps you with asynchronous task management. Celery has 'workers' that execute your asynchronous tasks in the background and 'beats' that schedule the particular task based on the time interval you set, like a cron job. You can learn more from the documentation - https://docs.celeryq.dev/en/stable/userguide/periodic-tasks.html

Let's get started.

1. Install Dependencies

Celery works best with Python 3.7+ versions; therefore, set up a virtual environment based on these versions. On the other hand, Redis needs to be installed in your OS. First, in case you are in linux or WSL, run:

sudo apt update && apt upgrade

sudo apt install redis-server

In your terminal, run redis-cli and you will get the redis shell. To check if redis is connected, type ping in the redis shell. If it gives PONG , the installation and connection is successful.

Now in your python virtual environment, run

pip install django django-celery-results celery redis

2. Edit settings.py File

First, create a Django project and an app within it.

django-admin startproject travelmanagement
cd travelmanagement
python manage.py startapp apilist

```{Python}

In order for your app to work with both Celery and Redis, you need to make some changes in your Django's settings.py file. Add the following snippets in your settings.py file:

CELERY_BROKER_URL = 'redis://127.0.0.1:6379' CELERY_ACCEPT_CONTENT = ['application/json'] CELERY_RESULT_SERIALIZER = 'json' CELERY_TASK_SERIALIZER = 'json' CELERY_RESULT_BACKEND = 'django-db' CELERY_IMPORTS = ('apilist.tasks',) # we will implement this function in the next step, generally it is appname.file_name CELERY_TIMEZONE = 'Asia/Dhaka'

CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'

For best practice, keep all these in a .env file and load using python-dotenv library. Let's do the setup for the redis cache:

```{Python}
CACHES = {
   'default': {
        'BACKEND': 'django_redis.cache.RedisCache',
        'LOCATION': os.getenv('REDIS_LOCATION'),
        'OPTIONS': {
            'CLIENT_CLASS': 'django_redis.client.DefaultClient',
            'CONNECTION_POOL_KWARGS': {'max_connections': 100},
        },
        'KEY_PREFIX': 'my_cache_prefix',  # Add a prefix to your cache keys
        'TIMEOUT': 300,  # Cache timeout in seconds (adjust as needed)
        'VERSION': 1,  # Cache version (change this if you change your cache keys structure)
        'OPTIONS': {
            'CLIENT_CLASS': 'django_redis.client.DefaultClient',
            'IGNORE_EXCEPTIONS': True,  # Ignore cache-related exceptions
        },
    }
}

3. Write the tasks.py File

Create a file named tasks.py inside your app folder. Here, you will write the function for the asynchronous task. For this project, we are required to fetch district-wise 2 p.m. temperature data periodically and store it. I had two URLs, one is a JSON for all districts, and another is an Open Meteo API link. First, we need to find out the structure of the JSON object, traverse it, and store it in a list. Later, you need to save the list in the redis cache with a key. Just like cookies, redis caches have two methods - set and get.

cache.set("your key", your_data)

The whole code will look like the following; print statements are for debugging purposes:

from django.core.cache import cache
from celery import shared_task
import requests


@shared_task
def fetch_and_store_temperature():
    try:
        response = requests.get('https://raw.githubusercontent.com/strativ-dev/technical-screening-test/main/bd-districts.json')
        response.raise_for_status()

        districts_data = response.json().get('districts', [])

        all_temperatures = []

        for district in districts_data:
            latitude = district.get('lat')
            longitude = district.get('long')

            if latitude is not None and longitude is not None:
                api_url = f'https://api.open-meteo.com/v1/forecast?latitude={latitude}&longitude={longitude}&hourly=temperature_2m&timezone=GMT&forecast_days=7'

                weather_response = requests.get(api_url)
                weather_response.raise_for_status()

                weather_data = weather_response.json()
                hourly_data = weather_data.get('hourly', {})
                temperature_at_2pm = hourly_data.get('temperature_2m', [])

                # Cache key for each district without specifying a travel date
                cache_key = f'temperature_at_2pm_{district["name"]}'
                cache.set(cache_key, temperature_at_2pm)

                # print("checking", cache.get(cache_key))

                all_temperatures.extend(temperature_at_2pm)

        # Store all temperatures in a single cache key
        cache.set('temperature_data', all_temperatures)

    except requests.exceptions.RequestException as e:
        # Handle API request exceptions
        print(f"Error fetching weather data: {e}")
    except Exception as e:
        # Handle other exceptions
        print(f"Error: {e}")

You must use the @shared_task decorator for any function that you want to run asynchronously.

4. Write the celery.py File

Go to your project directory and open a file named celery.py. This configuration is needed for scheduling your tasks. Before that, go to the project's init.py file and add the following lines:


from celery import app as celery_app

__all__ = ("celery_app",)

We will now set the necessary configurations in the celery.py file. Let's say, we want to execute our function every 15 minutes. Here is the code for that:

from __future__ import absolute_import, unicode_literals
import os

from celery import Celery
from celery.schedules import crontab
from django.conf import settings


os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'travelmanagement.settings') # project name.settings

app = Celery('travelmanagement') # provide your project name
app.conf.enable_utc = False

app.config_from_object(settings, namespace='CELERY')

app.autodiscover_tasks()

@app.task(bind=True)
def debug_task(self):
    print(f"Request: {self.request!r}")


app.conf.CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'

app.conf.beat_schedule = {
# create an object for your scheduling your task
    'fetch-and-store-temp-data-contrab': {
        'task': 'apilist.tasks.fetch_and_store_temperature', #app_name.tasks.function_name
        'schedule': crontab(minute='*/15'), #crontab() means run every minute
        # 'args' : (..., ...) In case function takes parameters, add them here
    }
}

crontab schedules the task for you. Additionally, if your asynchronous function takes arguments, comment out the last line and do the necessary changes. A list of crontab commands can be found in the official document - https://docs.celeryq.dev/en/stable/userguide/periodic-tasks.html

5. Clear pycache

Sometimes, if the code of the workers are changed too frequently, celery may not pick up the changes. For that, delete your sqlit3 database and run the following command. This command deletes all the pycache files. Do not forget to add the pycache files in the .gitignore to avoid pushing those.

` py3clean .

`

6. Run Migrations

If your task is still running based on old code, clear the migration files using the following commands (for Linux):

find . -path "*/migrations/*.py" -not -name "__init__.py" -delete
find . -path "*/migrations/*.pyc"  -delete

Now run the migrations. Because you are using celery, you need to use 3 commands, instead of the usual 2:

python manage.py makemigrations
python manage.py migrate
python manage.py migrate django_celery_beat

7. Run Celery workers and beats

Everything is configured. Now open three terminals, two for Celery and one for Django. Run the following in the first terminal:


celery -A your_project_name worker -l info

Run the following command in the other terminal to start the celery beats

celery -A your_project_name beat -l info

And finally, run the server in the last terminal.

python manage.py runserver

8. Create a view for the API

This part is about creating your API for travel suggestions. What you would do -

  • Extract the districts - where you are and where you want to go
  • Extract the temperature data from the cache using the get method
  • As it is a lot of data, I used the average values of temperatures of the districts and compared these. If my location has higher temperature, I suggested to pick the destination district, otherwise not.

Here is how you can do it:

class DecisionMakingAPIView(APIView):

    def post(self, request):
        try:
            data = request.data
            source_district_name = data.get('source_district_name')
            destination_district_name = data.get('destination_district_name')

            # Retrieve temperatures for source district
            source_cache_key = f'temperature_at_2pm_{source_district_name}'
            source_temperature_at_2pm = cache.get(source_cache_key)

            # Retrieve temperatures for destination district
            destination_cache_key = f'temperature_at_2pm_{destination_district_name}'
            destination_temperature_at_2pm = cache.get(destination_cache_key)

            if source_temperature_at_2pm is not None and destination_temperature_at_2pm is not None:
                # Calculate average temperature for the source district
                source_average_temperature = sum(source_temperature_at_2pm) / len(source_temperature_at_2pm)

                # Calculate average temperature for the destination district
                destination_average_temperature = sum(destination_temperature_at_2pm) / len(destination_temperature_at_2pm)

                # Compare average temperatures and make a travel recommendation
                if source_average_temperature < destination_average_temperature:
                    recommendation = f"Traveling from {source_district_name} to {destination_district_name} is Recommended as destination is cooler."
                else:
                    recommendation = f"Traveling from {source_district_name} to {destination_district_name} is NOT Recommended as destination is hotter."

                return Response({'recommendation': recommendation}, status=status.HTTP_200_OK)
            else:
                return Response({'error': 'Temperature data not available for the specified source or destination district.'}, status=status.HTTP_404_NOT_FOUND)

        except Exception as e:
            # Handle exceptions if necessary
            return Response({'error': str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)

9. Add the URL for the API

Lastly, create a urls.py file inside your app folder and add the following:

from django.urls import path
from . import views
urlpatterns = [
 path('recommendation/', views.DecisionMakingAPIView.as_view(), name='travel-recommendation')
]

10. Check the API response

In the postman, add the form data like the following and see the response. Done!

output

I have written this blog so that I can use this as a future reference. Find the project in the GitHub in case you want the code - https://github.com/Afroza2/Strativ-AB-Travel-Management.git