Scheduled Tasks

12003

Although the HTTP Server developed through the framework follows a request-response model, there are still many scenarios that require the execution of scheduled tasks, such as:

  1. Periodically reporting application status.
  2. Periodically updating local cache from remote interfaces.
  3. Periodically performing file splitting and deleting temporary files.

The framework provides a mechanism to make the writing and maintenance of scheduled tasks more elegant.

Writing Scheduled Tasks

All scheduled tasks are uniformly stored in the app/schedule directory, with each file representing an independent scheduled task that can configure the properties of the scheduled task and the methods to be executed.

As a simple example, to define a scheduled task that updates remote data to the memory cache, you can create a file named update_cache.js in the app/schedule directory.

const Subscription = require('egg').Subscription;

class UpdateCache extends Subscription {
  // Set the execution interval and other configurations through the schedule property
  static get schedule() {
    return {
      interval: '1m', // 1 minute interval
      type: 'all', // Specify that all workers need to execute
    };
  }

  // subscribe is the function that runs when the scheduled task is executed
  async subscribe() {
    const res = await this.ctx.curl('http://www.api.com/cache', {
      dataType: 'json',
    });
    this.ctx.app.cache = res.data;
  }
}

module.exports = UpdateCache;

It can also be simplified to:

module.exports = {
  schedule: {
    interval: '1m', // 1 minute interval
    type: 'all', // Specify that all workers need to execute
  },
  async task(ctx) {
    const res = await ctx.curl('http://www.api.com/cache', {
      dataType: 'json',
    });
    ctx.app.cache = res.data;
  },
};

This scheduled task will execute once every minute on each Worker process, fetching remote data and mounting it to app.cache.

Task

  • Both task and subscribe support generator function and async function.
  • The parameter for task is ctx, an anonymous Context instance, which can be used to call service and others.

Scheduling Methods

Scheduled tasks can specify either interval or cron as two different scheduling methods.

interval

The execution timing of the scheduled task can be configured through the schedule.interval parameter, and the scheduled task will execute once at the specified time interval. The interval can be configured as:

  • A numeric type, measured in milliseconds, for example, 5000.
  • A string type, which will be converted to milliseconds using ms, for example, 5s.
module.exports = {
  schedule: {
    // Execute every 10 seconds
    interval: '10s',
  },
};

cron

The execution timing of the scheduled task can be configured through the schedule.cron parameter, and the scheduled task will execute at specific times according to the cron expression. The cron expression is parsed using cron-parser.

Note: cron-parser supports optional seconds (linux crontab does not).

* * * * * *
┬ ┬ ┬ ┬ ┬ ┬
│ │ │ │ │ │
│ │ │ │ │ └─ Sunday (0 - 7) (0 or 7 is Sunday)
│ │ │ │ └─── Month (1 - 12)
│ │ │ └───── Day of the month (1 - 31)
│ │ └─────── Hour (0 - 23)
│ └───────── Minute (0 - 59)
└─────────── Second (0 - 59, optional)
module.exports = {
  schedule: {
    // Execute on the hour every three hours
    cron: '0 0 */3 * * *',
  },
};

Types

The framework's scheduled tasks support two types by default: worker and all. Both worker and all support the above two scheduling methods, but the worker that executes the scheduled task at the execution time differs:

  • worker type: Only one worker on each machine will execute this scheduled task, and the selection of the worker that executes the scheduled task is random each time.
  • all type: Every worker on each machine will execute this scheduled task.

Other Parameters

In addition to the parameters mentioned above, scheduled tasks also support the following parameters:

  • cronOptions: Configures the timezone for cron, etc. Refer to the cron-parser documentation.
  • immediate: When this parameter is set to true, the scheduled task will execute immediately once after the application starts and is ready.
  • disable: When this parameter is set to true, the scheduled task will not be started.
  • env: An array that specifies the environments in which the scheduled task should be started.

Execution Logs

Execution logs will be output to ${appInfo.root}/logs/{app_name}/egg-schedule.log, and by default, will not be output to the console. You can customize this through config.customLogger.scheduleLogger.

// config/config.default.js
config.customLogger = {
  scheduleLogger: {
    // consoleLevel: 'NONE',
    // file: path.join(appInfo.root, 'logs', appInfo.name, 'egg-schedule.log'),
  },
};

Dynamic Configuration of Scheduled Tasks

Sometimes, we need to configure the parameters of scheduled tasks. Scheduled tasks can also support another writing style:

module.exports = (app) => {
  return {
    schedule: {
      interval: app.config.cacheTick,
      type: 'all',
    },
    async task(ctx) {
      const res = await ctx.curl('http://www.api.com/cache', {
        contentType: 'json',
      });
      ctx.app.cache = res.data;
    },
  };
};

Manually Executing Scheduled Tasks

We can run a scheduled task using app.runSchedule(schedulePath). app.runSchedule accepts a scheduled task file path (relative path or absolute path located in the app/schedule directory), executes the corresponding scheduled task, and returns a Promise object.

In the following scenarios, we may need to manually execute scheduled tasks:

  • Manually executing scheduled tasks can elegantly write unit tests for scheduled tasks.
const mm = require('egg-mock');
const assert = require('assert');

it('should schedule work fine', async () => {
  const app = mm.app();
  await app.ready();
  await app.runSchedule('update_cache');
  assert(app.cache);
});
  • When the application starts, we can manually execute scheduled tasks for system initialization. After initialization is complete, the application can be started. For specifics, refer to the Custom Application Startup section. We can write initialization logic in app.js.
module.exports = (app) => {
  app.beforeStart(async () => {
    // Ensure that data is ready before the application starts listening on the port
    // Subsequent data updates will be automatically triggered by scheduled tasks
    await app.runSchedule('update_cache');
  });
};

Extending Scheduled Task Types

Although the default scheduled tasks provided by the framework only support single process execution and all process execution, in certain cases, such as when services are not deployed on a single machine, we may need a specific process in the cluster to execute scheduled tasks.

Although the framework does not directly provide this functionality, developers can extend new scheduled task types in the upper framework.

In agent.js, inherit agent.ScheduleStrategy, and then register it using agent.schedule.use():

module.exports = (agent) => {
  class ClusterStrategy extends agent.ScheduleStrategy {
    start() {
      // Subscribe to messages sent by other distributed scheduling services, and let one process execute the scheduled task upon receiving a message
      // Users can configure the distributed scheduling scenario (scene) in the schedule property of the scheduled task
      agent.mq.subscribe(this.schedule.scene, () => this.sendOne());
    }
  }
  agent.schedule.use('cluster', ClusterStrategy);
};

The ScheduleStrategy base class provides the following methods:

  • this.schedule - The properties of the scheduled task, including the default supported disable property and parsing of other custom configurations.
  • this.sendOne(...args) - Randomly notify a worker to execute the task, with args passed to subscribe(...args) or task(ctx, ...args) methods.
  • this.sendAll(...args) - Notify all workers to execute the task.