musings on crossbar, docker, guest, and rest bridging - a task management system

#1

I’ve been putting together a task management system that is constructed around crossbar relaying rpc and pub/sub messages to several components and services. Here is a quick list of the tech involved:

  • linux - ubuntu 16.04 LTS

  • crossbar - latest via docker 17.6.1-3

  • supervisord - process management

  • beanstalkd - task queue

  • cron - scheduled jobs at set times
    And in the mix is also:

  • apache - webserver

  • php 7 - main backend language

  • mongodb - backend storage

  • mysql - backend storage

  • javascript - main frontend language
    Here’s how everything works …

HTTP CALLER (BRIDGING)

At predefined times, cron will run a command line script that will issue RPC calls into crossbar. I’ve experimented with various ways to do this, but since I don’t want to write a lot of my own code and I don’t care about the RPC responses, I’m using rest bridging inside crossbar. Crossbar listens on a port as an HTTP service and the command line scripts simply connect to this port, make the RPC call, and receive a response.

http://crossbar.io/docs/HTTP-Bridge/

I wish the page above would use a few graphics to explain things because I have to read the page multiple times to figure out if I’m using the HTTP Caller or HTTP Callee in this configuration. The english on this page just says “via” vs “to” and that’s not enough to quickly indicate the direction we are going. I’m pretty sure it’s the HTTP Caller and HTTP Publisher that I’ve configured with this transport rule:

{

“id”: “web9080”,

“type”: “web”,

“endpoint”: {

“type”: “tcp”,

“port”: 9080

},

“paths”: {

“call”: {

“type”: “caller”,

“realm”: “realm1”,

“role”: “backend”

},

“publish”: {

“type”: “publisher”,

“realm”: “realm1”,

“role”: “backend”

}

}

}

BACKEND WAMP CLIENTS

On the backend, I have several PHP scripts connect to crossbar and register uris that can be called. For the sake of organizing my API calls, I have broken my uris into namespaces off the top-level (TLD uri com.example) and each namespace is registered by a separate backend client. So, for example we might have all of the following uris:

  • com.example.widget.* - registered by backend client “wamp.php com.example.widget”

  • com.example.thing.* - registered by backend client “wamp.php com.example.thing”

  • com.example.bobble.* - registered by backend client “wamp.php com.example.bobble”
    In this way, I have a single backend client written in PHP (wamp.php) that will register all the rpc calls I have that are prefixed with com.example.widget. So, for example, it would register all of the following:

  • com.example.widget.func_a

  • com.example.widget.do_that_other_thing

  • com.example.widget.other.more_func

BACKGROUND vs FOREGROUND TASKS

Most of the RPC calls we are writing are blocking and therefore will take a long time to complete. For these calls, I really want to detach the process and put it into the background to run. However, I can’t just detach all incoming calls at the same time or I might overload a server, so I also need task queuing and would like to process the task queue with a preset number of workers. I looked into crossbar’s microservices support here:

https://github.com/crossbario/crossbar-examples/tree/master/scaling-microservices

But this didn’t quite give me the results I was looking for because I am using PHP and Thruway as my backend client. When multiple calls are sent into PHP Thruway, I find that still only 1 call is actually being processed at a time and if I have anything that is synchronous code, the entire wamp client process would hang/wait until that synchronous code completed. So, I can’t use crossbar’s microservices for my background tasks. Instead, in PHP, I tagged my uris as being either a ‘foreground’ or a ‘background’ task. Then, in the wamp.php client, any time that a background uri was called, I simply take the WAMP parameters and wrap them into a “job” and stick that job into beanstalkd as a queued task and return immediately:

// register as a client procedure

$session->register(
$uri,
function ($args, $kwargs, $details) use ($uri, $client) {
$input = new TaskInput($uri, $args, $kwargs, $details);
return $this->invokeBackgroundTask($input, $client);
}
)

``

And … to invoke …

public function invokeBackgroundTask(TaskInput $input, TaskClient $client)
{

// convert task input into a queue job
$job = [
‘uri’ => $input->getUri(),
‘args’ => $input->getArgs(),
‘kwargs’ => $input->getKwargs(),
‘details’ => $input->getDetails(),
‘priority’ => $priority
];

// insert into beanstalkd tube
$job_id = $queue->put(json_encode($job), $priority);

// announce that we added a new job!
$client->getSession()->publish('com.example.queue.notify', null, ['type' => 'add', 'job' => $job_id]);

// return the job id we inserted
return new Result([$job_id]);

}

``

After sticking all the parameters into beanstalkd, the RPC call will immediately return the new job_id that was assigned by beanstalkd. In the future, I might expand on this design to allow the job to be monitored and have the results returned to the calling client, but for now it suffices that by calling this “background” uri, the job is simply “queued” and the call returns immediately.

Meanwhile, foreground RPC calls will simply invoke the “task” class mapped by the uri and run the process in the foreground and return the results of that task as the RPC result.

PROCESSING QUEUED CALLS

I’m using supervisord to run a pool of worker.php processes. These are command line processes that monitor the beanstalk queue for jobs, pop one off, “process” the task, and exit. I wrap this PHP script in a bash script to run the PHP repeatedly. I find that when PHP comes to life, runs, and dies completely, it does a much better job with memory management and resource management, etc. Its similar to how Apache works by forking processes to handle HTTP requests. Spawning PHP jobs and having them die isn’t as efficient as forking processes, so I think I can still improve on this part.

An example of my worker.conf for supervisor:

[program:worker]
command = bash /opt/bin/worker.bash
numprocs = 5
process_name = %(program_name)s_%(process_num)02d
autostart = true
autorestart = true
environment=X_ENV=dev
user = www-data
redirect_stderr=true

``

This will spawn 5 jobs that process the task queue. If there is nothing to do, the jobs will block and wait for a job. Once a job is received, they process it and die and a now worker is spawned to replace it. The worker.bash looks like this:

#!/bin/bash
cd dirname $0

POSSIBLE EXIT CODES

0 - success (restart immediately)

* - anything else is unplanned restart

run the PHP CLI and check the exit status

nice /usr/bin/php -q ./worker.php
ERR=$?

EXIT(0) - successful run - restart immediately

if [ ${ERR} -eq 0 ]
then
exec /bin/bash $0 $@;
fi

ERROR - unplanned exit, pause 1 minute, then restart

echo “unplanned restart: err:” ${ERR};
sleep 1
exec /bin/bash $0 $@

``

TASK INTERFACE

Since I have some RPC calls running in the foreground and other RPC calls running in the background, I wanted an easy way to write code that can be called either way easily, so I created a PHP interface:

<?php namespace Dante\Task; interface TaskInterface { /** * @param TaskInput $input * * @return TaskResult */ public function run(TaskInput $input): TaskResult; }

``

An RPC call in WAMP is basically a function with Input and Output, so that’s what this interface is. The TaskInput object contains all the parameters like uri, args, kwargs, and details, and the TaskResult contains the output like args and kwargs. Now, all of my API calls just need to implement this interface and voila, I can quickly expand my API library. For example, here is the get_free_space example RPC call:

<?php namespace Dante\Task\Example; use Dante\Task\TaskInput; use Dante\Task\TaskInterface; use Dante\Task\TaskResult; class GetFreeSpace implements TaskInterface { /** * @param TaskInput $input * * @return TaskResult */ public function run(TaskInput $input): TaskResult { // read free space $bytes = disk_free_space('/'); // result return new TaskResult([$bytes]); } }

``

In my API map, I might define this task as follows:

// api map
public static $api = [

// dante
‘com.example.dante.get_free_space’ => self::PROCEDURE,
‘com.example.dante.sleeper’ => self::TASK,

];

``

As you can see, the ‘com.example.dante.get_free_space’ task will run in the foreground (self::PROCEDURE), but the ‘com.example.dante.sleeper’ task will be queued and run by a worker in the background (self::TASK). By simply changing the mapping, the same code will run either way. Both foreground and background RPC calls use this TaskInterface to invoke RPC functions.

ABOUT DOCKER AND GUEST

My backend clients need to run whenever crossbar is running. Ideally, they will start just after crossbar starts, so it makes sense that I used to use the ‘guest’ feature built into crossbar to run these clients. The guest configuration is something like this:

{
“id”: “guest-dante”,
“type”: “guest”,
“executable”: “/usr/bin/env”,
“arguments”: [
“php”,
“/opt/bin/wamp.php”,
“com.example.dante”
]
}

``

But, recently I have switched to using Docker for running Crossbar. My docker create statement looks like this:

docker create
-u root
-v /home/dante/crossbar:/node
-v /etc/letsencrypt:/etc/letsencrypt
-p 8080:8080
-p 127.0.0.1:9001:9001
-p 127.0.0.1:9080:9080
–name crossbar
crossbario/crossbar

``

The problem is that this docker container doesn’t have PHP installed and also can’t find the /opt/bin/wamp.php script! I can mount the /opt/bin directory just find, but that doesn’t solve the PHP problem and … where exactly would this backend client be running? Should it run inside the docker container or inside my host machine? I feel like by migrating over to Docker, we have made the ‘guest’ feature useless. So, I’ve had to resort to running my backend clients using supervisor similar to how I run my worker.php anyhow. Here is a sample crossbar-dante.conf:

[program:crossbar-dante]
command = php /opt/bin/wamp.php com.example.dante
numprocs = 1
process_name = %(program_name)s_%(process_num)02d
autostart = true
autorestart = true
environment=X_ENV=dev
user = www-data
redirect_stderr=true

``

But, did you also notice that line “user = www-data” in there? Turns out I’d also like to run these workers/jobs/tasks as the same did that apache uses so that I don’t have permission issues accessing files and resources. So, supervisor helps me to map the user for processes it runs.

CAN WE JUST USE APACHE OR NGINX

When I run a task, I either want to do it in the foreground or in the background. I’m probably going to use PHP for all my backend processes. A web server already knows how to listen on a socket, fork processes, processes requests and issue responses. “REST” works very well through a web server and crossbar has an HTTP Callee Bridge that would allow me to issue WAMP requests and get routed to an HTTP endpoint:

http://crossbar.io/docs/HTTP-Bridge-Callee/

I don’t really like how this bridge is configured currently, however because it messes with how WAMP calls appear to clients.

With normal WAMP RPC, you just have rpc calls as such:

uri(args, kwargs, details) --> [args, kwargs]

But, with the HTTP callee, you can only register a single URI and you extend that URI by passing additional parameters:

  • method
  • url
  • body
  • headers
  • params
    I would rather that the configuration of an HTTP Caller Bridge appear seamless and allow us to configure all of these parameters in the crossbar config instead of on a per-call basis. Something like this might be better:
{
"type": "container",
...
"components": [
{
"type": "class",
"classname": "crossbar.adapter.rest.RESTCallee",
"realm": "realm1",
"extra": {
"procedure": "com.example.dante.*",

“url”: “https://api.example.com/dante”,

“method”: “POST”,

“headers”: {

“Content-type”: “application/json”

},

"params": {
"color": "blue"
}

},

"transport": {
"type": "websocket",
"endpoint": {
"type": "tcp",
"host": "127.0.0.1",
"port": 8080
},
"url": "ws://127.0.0.1:8080/ws"
}
}
]
}

Then, just pass in args, kwargs, and details as POST params to the endpoint and receive back args and kwargs as json output.

Notice I also use a WILDCARD to register the procedures that are handled by this url. The full ‘uri=com.example.dante.test.myfunc’ can be passed as an additional parameter. Now, we can write RPC handlers in PHP on our web server and don’t need to have custom wamp clients for RPC/REST functionality.

With better HTTP Caller Bridging built into crossbar, I can go back to writing all my code as REST apis (implementing WAMP HTTP protocol) and use crossbar in a very lightweight bridging role only. It would possibly enable me to replace a lot of this architecture?

Consider:

  • cron calls crossbar http callee bridge to invoke RPC for a job that needs to be run in the background
  • crossbar runs http caller bridge to reach apache/php REST api with RPC request for background job
  • apache inserts RPC job details into background beanstalkd queue and returns job_id to crossbar callee
  • supervisor runs a much simplified worker.php process to pop jobs off work queue and call crossbar RPC again (with request for foreground mode)
  • crossbar runs http caller bridge to reach apache/php REST to process RPC request again and forcefully runs in foreground
    If you continue this design thought process … maybe that beanstalkd queue can be embedded into crossbar and that worker.php process can be embedded into crossbar also.

FINALLY

What we need is an option that allows us to optionally “queue and detach” an RPC call in crossbar and have crossbar manage a pool of resources to invoke REST bridging to process those RPC commands.

Crossbar essentially becomes a simple pub/sub router and REST bridge capable of rpc in the foreground or queued/parallel RPC in the background.

What do you think of this?

– Dante

0 Likes

#2

It looks like I may have found something like what I am wanting here…

https://github.com/voryx/SlackWamp

This is custom http bridge client written using RxPHP Thruway client and observables. There’s a good example of an HTTP APIBridge using Slack that I might be able to modify to call my own HTTP API.

I’ll need to test this a bit before I can comment on if it’s great or not. I’m wondering how it will handle performance … and whether the http API calls will be handled serially or in parallel. I see it is using Rx\React\Http so maybe it can be async.

– Dante

0 Likes