ASYNC

INTRO

The n2o_async module dedicated for creating and tracking supervised processes across all applications, using any ETS tables. Any supervised process in N2O is created using n2o_async, such as: ring vnodes, timers, authorization, web page processes, test processes and other services. Loop process inside info/2 protocol handlers should spawn new async processes proc/2 in case of time consuming opetations, because protocol handler is a critical path and it should be handled as soon as possible.

CALLBACK

proc(term(),#handler{}) -> #ok{} | #reply{}.

The proc/2 is a callback that will be called on each gen_server's calls: handle_call, handle_cast and handle_info, its init and terminate. It returns either #ok as initial state of the process (which is the #handler{} too) or its response to gen_server:call/2 with new state included in #reply.

EXAMPLE

Here is literal implementation of N2O Timer which invalidates the caching table used for session variables.

Listing 1. Invalidate Cache by Timer
proc(init,#handler{}=Async) -> {ok,Async#handler{state=timer(ping())}}; proc({timer,ping},#handler{state=Timer}=Async) -> erlang:cancel_timer(Timer), io:format("n2o Timer: ~p\r~n",[ping]), n2o:invalidate_cache(caching), {reply,ok,Async#handler{state=timer_restart(ping())}}. timer(Diff) -> {X,Y,Z} = Diff, erlang:send_after(1000*(Z+60*Y+60*60*X),self(),{timer,ping}). ping() -> application:get_env(n2o,timer,{0,1,0}).
Listing 1. Invalidate Cache by Timer
> n2o_async:start(#handler{ module=n2o, class=caching, group=n2o, state=[], name="timer"}).

Main purpose of n2o_async is to create such processes from single proc/2 function and track pid in ETS table which is specified during process #handler{} initialization.

Listing 2. Understaning n2o_async
1> supervisor:which_children(n2o). [{{ring,4},<0.1661.0>,worker,[n2o_vnode]}, {{ring,3},<0.1655.0>,worker,[n2o_vnode]}, {{ring,2},<0.1653.0>,worker,[n2o_vnode]}, {{ring,1},<0.1651.0>,worker,[n2o_vnode]}, {{caching,"timer"},<0.1604.0>,worker,[n2o]}] 2> ets:tab2list(ring). [{{ring,4},infinity,<0.1661.0>}, {{ring,1},infinity,<0.1651.0>}, {{ring,2},infinity,<0.1653.0>}, {{ring,3},infinity,<0.1655.0>}] 3> ets:tab2list(caching). [{{caching,"timer"},infinity,<0.1604.0>}] 4> n2o_async:send(caching,"timer",{timer,ping}). n2o Timer: ping ok 5> n2o_async:pid(caching,"timer"). <0.1604.0>

RECORDS

Each process is driven by its protocol which in fact a sum of protocol messages. Though n2o_async as being generic don't limit the protocol messages, however it defines the type of process state, the #handler{} record.

Listing 3. Erlang/OTP Records
#ok { code = [] :: [] | #handler{} }. #error { code = [] :: [] | term() }. #reply { data = [] :: [] | term() , code = [] :: [] | #handler{} }.

According to N2O agreement each protocol message filed should include [] in its type as default nil.

Listing 4. N2O Records
#handler { name = [] :: [] | term(), module = [] :: [] | atom(), class = [] :: [] | atom(), group = [] :: [] | atom(), config = [] :: [] | term(), state = [] :: [] | term(), seq = [] :: [] | integer() }.

API

start(#handler{}) -> {pid(),term()} | #error{}.

Spawns proc/2 function inside a process under supervision.

stop(Class,Name) -> #handler{} | #error{}.

Kills the process and remove from supervision.

restart(Class,Name) -> {pid(),term()} | #error{} | #handler{}.

Tries to stop the process. On success it starts the new one, else return error.

send(Class,Name,term()) -> term().

Sends gen_call message to process, taken from Class table with Name key. Returns the response from gen_server:call.

pid(Class,Name) -> pid().

Returns pid that was stored during process initialization in Class table with Name key.

Listing 5. gen_server compatibility.
1> n2o_async:pid(caching,"timer") ! {timer,ping}. n2o Timer: ping {timer,ping} 2> n2o_async:send(caching,"timer", {timer,ping}). n2o Timer: ping ok 3> gen_server:call( n2o_async:pid(caching,"timer"), {timer,ping}). n2o Timer: ping ok

This module may refer to: n2o, n2o_proto, n2o_vnode, gen_server, supervisor.