Total Pageviews

Monday, 11 July 2016

基于go的Periodic task system


What is Periodic?

Periodic is a Gearman like task system, provides a generic application framework to do the periodic work.

How Does Periodic work?

A Periodic powered application consists of three parts: a client, a worker, and a periodic server. The client is responsible for creating a job to be run and sending it to a periodic server. The periodic server will find a suitable worker that can run the job when then job time is up. The worker performs the work requested by the client and sends a stat to the periodic server. Periodic provides client and worker APIs that your applications call to talk with the periodic server (also known as periodic -d) so you don’t need to deal with networking or mapping of jobs.
Internally, the periodic client and worker APIs communicate with the periodic server using TCP sockets. To explain how Periodic works in more detail, lets look at a simple application that will print a string on a random delay. The example is given in Python3, although other APIs will look quite similar.
We start off by writing a client application that is responsible for sending off the job. It does this by using the Periodic client API to send some data associated with a function name, in this case the function random_print. The code for this is:
from aio_periodic import Client
from time import time
client = Client()
yield from client.connect()
job = {
    "name": "Hello world!",
    "func": "random_print",
    "sched_at": int(time()),
yield from client.submitJob(job)
This code initializes a client class, configures it to use a periodic server with add_server, and then tells the client API to run the random_print function with the workload “Hello world!”. The function name and arguments are completely arbitrary as far as Periodic is concerned, so you could send any data structure that is appropriate for your application (text or binary). At this point the Periodic client API will package up the job into a Periodic protocol packet and send it to the periodic server to find an appropriate worker that can run the random_print function. Let’s now look at the worker code:
from aio_periodic import Worker
import random

worker = Worker()
yield from worker.connect()
yield from worker.add_func("random_print")
count = 0
while True:
    job = yield from worker.grabJob()
    count += 1
    if count > 10:
        yield from job.done()

    yield from job.sched_later(random.randint(5, 10))

This code defines a function random_print that takes a string and print the string on a random delay for 10 times. It is used by a worker object to register a function named random_print after it is setup to connect to the same periodic server as the client. When the periodic server receives the job to be run, it looks at the list of workers who have registered the function name random_print and forwards the job on to one of the free workers. The Periodic worker API then takes this request, runs the function random_print, and sends the job stat of that function back to the periodic server.
Random print
As you can see, the client and worker APIs (along with the periodic server) deal with the job management and network communication so you can focus on the application parts.

Quick start


go get -v
periodic -h

Start periodic server

$ periodic -d

A worker to ls a dirctory every five second.

$ vim
#!/usr/bin/env bash
ls -lrth $@
echo "SCHED_LATER 5" # tell periodic do the job 5 second later
# echo "DONE" # tell periodic the job is done
# echo "FAIL" # tell periodic the job is fail

$ chmod +x

$ periodic run -f ls5 --exec `pwd`/

Submit a job

$ periodic submit -f ls5 -n /tmp/


Periodic clients

curl http://ip:port                      # Show the status of periodic
curl http://ip:port/[funcName]           # Show the status of a func
curl -X DELETE http://ip:port/[funcName] # delete the func

curl -d func=[funcName] -d name=[jobName] -d args=[jobArgs] -d timeout=[timeout] -d sched_at=[schedAt] http://ip:port # submit a job
curl -d name=[jobName] -d args=[jobArgs] -d timeout=[timeout] -d sched_at=[schedAt] http://ip:port/[funcName]         # submit a job
curl -d name=[jobName] -d act=remove http://ip:port/[funcName]                     # remove a job
curl -d name=[jobName] -d func=[funcName] -d act=remove http://ip:port/[funcName]  # remove a job