Module Sdflow


module Sdflow: sig .. end
Structured DataFlow, a high-level combinatory dataflow programming library based upon destructive lazy streams. The base type flow is compatible with Stream.t of standard OCaml.

Unless specified, all the flow manipulation functions are lazy by default.


type 'a flow = 'a Stream.t 
exception End_of_flow
End_of_flow = Stream.Failure

Conversion interfaces

Almost identical to Stream's interface function.

val of_fun : (unit -> 'a) -> 'a flow
of_fun takes a generator f as arguments, produces a flow whose elements is acquired by applying f () repeatedly. f should raise End_of_flow to sign the end of the flow.
val of_list : 'a list -> 'a flow
val of_string : string -> char flow
val of_channel : Pervasives.in_channel -> char flow
val to_fun : 'a flow -> unit -> 'a
val to_list : 'a flow -> 'a list
val to_string : char flow -> string
val to_channel : Pervasives.out_channel -> char flow -> unit

Flow creation

Note that 'a flow is compatible with 'a Stream.t, you may also create a flow with Stream function or [<..>] notation.

val seq : 'a -> ('a -> 'a) -> ('a -> bool) -> 'a flow
seq init step cond creates a sequence of data as flow, which starts from init, extends by step, until the condition cond fails. E.g. seq 1 ((+) 1) ((>) 100) returns [1, 2, ... 99]. If cond init is false, the result is an empty flow.
val enum : ?fin:int -> int -> int flow
enum ~fin:m n creates a flow sequence of [n, n+1, ... m]. It returns empty flow if m < n. The ~fin is optional, when absent, the flow is infinite at the open end. The sequence continues from 0 after reaching max_int.
val (--) : int -> int -> int flow
n--m is a shorthand for enum ~fin:m n
val repeat : ?times:int -> 'a -> 'a flow
repeat ~times:n x creates a flow sequence filled with n times of x. It return infinite flow when ~times is absent. It returns empty flow when times <= 0
val cycle : ?times:int -> 'a flow -> 'a flow
cycle is similar to repeat, except that the content to fill is a subflow rather than a single element. Note that times represents the times of repeating not the length of flow.

Flow consuming

Flow is destructive and lazy , the same as Stream. I.e. visiting (or pattern matching) the head of a flow will cause both the evaluation and the removal of the element from the flow. All functions in this part are destructive except peek, which only forces the evaluation.

val peek : 'a flow -> 'a option
same as Stream.peek
val next : 'a flow -> 'a
same as Stream.flow
val iter : ('a -> unit) -> 'a flow -> unit
same as Stream.iter
val foldl : ('a -> 'b -> 'a * bool option) -> 'a -> 'b flow -> 'a
foldl f init flow is a lazy fold_left. f accu elt should return (new_accu, state) where new_accu is normal accumulation result, and state is a flag representing whether the computation should continue and whether the last operation is valid: None means continue, Some b means stop where b = true means the last addition is still valid and b = false means the last addition is invalid and should be revert.
val foldr : ('a -> 'b lazy_t -> 'b) -> 'b -> 'a flow -> 'b
foldr f init flow is a lazy fold_right. Unlike the normal fold_right, the accumulation parameter of f elt accu is lazy, hence it can decide not to force the evaluation of accu if the current element elt can determin the result by itself.
val fold : ('a -> 'a -> 'a * bool option) -> 'a flow -> 'a
fold is foldl without initialization value, where the first element of flow is taken as init. It raises End_of_flow exception when the input flow is empty.

Flow arithmetic

All the functions in this part are lazy.

val cons : 'a -> 'a flow -> 'a flow
cons x flow equals [<'x; flow>].
val apnd : 'a flow -> 'a flow -> 'a flow
apnd fla flb equals [<fla;flb>].
val is_empty : 'a flow -> bool
is_empty flow tests whether flow is empty. But note that, it forces the evaluation of the head element if any.
val filter : ('a -> bool) -> 'a flow -> 'a flow
filter test flow picks all the elements satisfying test from flow and return the results in the same order as a flow.
val concat : 'a flow flow -> 'a flow
concatenate a flow of flows
val take : int -> 'a flow -> 'a flow
take n flow returns the prefix of flow of length n, or flow itself if n is greater than the length of flow
val drop : int -> 'a flow -> 'a flow
drop n flow returns the suffix of flow after the first n elements, or a empty flow if n is greater than the length of flow
val take_while : ('a -> bool) -> 'a flow -> 'a flow
take_while test flow returns the longest (possibly empty) prefix of flow of elements that satisfy test.
val drop_while : ('a -> bool) -> 'a flow -> 'a flow
drop_while test flow returns the remaining suffix of take_while test flow.
val span : ('a -> bool) -> 'a flow -> 'a flow * 'a flow
span test flow is equivalent to (take_while test flow, drop_while test flow).
val break : ('a -> bool) -> 'a flow -> 'a flow * 'a flow
break test flow is equivalent to span (fun x -> not (test x)) flow
val group : ('a -> bool) -> 'a flow -> 'a flow flow
group test flow devides flow into a flow of sub-flows, where each sub-flow is the longest continuous flow of elements whose test results are the same.

Flows pair arithmetic

All the functions in this part are lazy.

val dup : 'a flow -> 'a flow * 'a flow
dup flow returns a pair of flows which are identical to flow. Note that flow is a destructive data structure, the point of dup is to return two flows can be used independently.
val comb : 'a flow * 'b flow -> ('a * 'b) flow
comb transform a pair of flow into a flow of pairs of corresponding elements. If one flow is short, excess elements of the longer flow are ignored.
val split : ('a * 'b) flow -> 'a flow * 'b flow
split is the opposite of comb
val merge : (bool -> 'a -> bool) -> 'a flow * 'a flow -> 'a flow
merge test (flowa, flowb) merge the elements from flowa and flowb into a single flow. The bool type here represents the id of the two input flows where true is the first and false represents the second. The test function is applied to each element of the output flow together with the id of the input flow from which it was extracted, to decide which flow should the next element come from. The first element is always taken from flowa. When a flow runs out of elements, the merge process will continue to take elements from the other flow until both flows reach their ends.
val switch : ('a -> bool) -> 'a flow -> 'a flow * 'a flow
switch test flow split flow into two flows, where the first flow have all the elements satisfying test, the second flow is opposite. The order of elements in the source flow is preserved.

Flows array arithmetic

All the functions in this part are lazy.

val dupn : int -> 'a flow -> 'a flow array
dupn is the array version of dup. dupn n fl produces an array of n flows all identical to the input flow, but can be used independently.
val combn : 'a flow array -> 'a array flow
combn is the array version of comb. It takes an array of flows and combines the elements of corresponding positions in each flow as arrays, the result is returned as a flow of the same order. When any of the flow runs out of elements, the rest elements of all flows are simply ignored.
val splitn : int -> 'a array flow -> 'a flow array
splitn is the array version of split, also the opposite of combn. It transform a flow of arrays to an array of flows. Considering the fact that arries in OCaml can be various in length, also considering the requirement that splitn should be fully lazy (hence shouldn't look into the heading element), splitn takes an extra integer parameter n as the length of the destination flow array. Any arrays from the input array flow should have a least n elements, and in each of them, any elements beyond the nth position are simply discarded.
val mergen : (int -> 'a -> int) -> 'a flow array -> 'a flow
mergen is the array version of merge. It takes a array of flows and merge them into a single flow. The order of elements is determined by the first parameter f, which will be applied on each element of the output flow and its originate flow id to decide the next element's flow id. The first element always comes from the 0th flow. If f chooses a flow running out of element, it will take the element from the next non-empty flow, until all the input flows are empty.
val switchn : int -> ('a -> int) -> 'a flow -> 'a flow array
switchn is the array version of switch, also the opposite of mergen. switch n f fl split fl to an array of n flows, f is applied to each element of fl to decide the id of its destination flow.

Computation over flow

All the functions in this part are lazy.

val map : ('a -> 'b) -> 'a flow -> 'b flow
map f flow applies f in turn to elements from flow and return the results as a flow in the same order.
val map2 : ('a -> 'b -> 'c) -> 'a flow -> 'b flow -> 'c flow
map2 f flowa flowb applies f in turn to elements of corresponding positions from flowa and flowb. The results are constructed in the same order as a flow. If one flow is short, excess elements of the longer flow are ignored.
val scanl : ('a -> 'b -> 'a) -> 'a -> 'b flow -> 'a flow
scanl f init flow returns a flow of successive reduced values from the left: scanl f init [<'e1;'e2;..>] = [<'init; '((f init e1) as e1'); '(f e1' e2); ..>]
val scan : ('a -> 'a -> 'a) -> 'a flow -> 'a flow
scan is similar to scanl but without the init value: scan f [<'e1;'e2;..>] = [<'e1;'(f e1 e2);..>].
val map_fold : ('a -> 'a -> 'a * bool option) -> 'a flow -> 'a flow
map_fold f flow applies fold f flow repeated, the folding results in turn construct the output flow.

Circular flow

All the functions in this part are lazy. It would be helpful to draw dataflow graph in order to better understand the functions of this part.

val feedl : 'a flow -> ('a flow -> 'a flow * 'b) -> 'b
feedl init expects a function f of type 'a flow -> 'a flow * 'b as parameter. feedl takes the first (left) output of f as the input of f itself, with init as the initial lead (prefix). The second (right) output of f is then returned as the result.
val feedr : 'a flow -> ('a flow -> 'b * 'a flow) -> 'b
The same as feedl, except that position of f's output flow is on the right (second).
val circ : 'a flow -> ('a flow -> 'a flow) -> 'a flow
circ init expects a arguments f of type 'a flow -> 'a flow. circ takes the output of f as the input of f, with init as the initial lead (prefix). A copy of the output of f is returned as the result.

High-level combinators

All the functions in this part are lazy. We call a function whose input and output parameters are all flows as a "flow processor", e.g. 'a flow -> 'b flow or 'a flow -> 'b flow * 'c flow etc.

val (|||) : ('a flow -> 'b flow) ->
('b flow -> 'c flow) -> 'a flow -> 'c flow
pipeline combinator, f1 ||| f2 feeds the output flow of f1 to f2 as its input flow. It's just the specialized version of fun f g x -> g (f x) on flow type.
val while_do : ?size:int ->
('a -> bool) ->
('a flow -> 'a flow) -> 'a flow -> 'a flow
while_do ~size:n cont f flow tests each element of input flow with cont to see whether it should continue to loop. If false, the element is added to the output flow; if true, the element is added to the input flow of f whose output is then merged back with the input flow. The optional argument size is the maximum number of elements running in parallel inside the loop. The default value is 1, in which case the order of elements is preserved, but it also means if an element loops forever any elements behind it won't exceed.
val do_while : ?size:int ->
('a -> bool) ->
('a flow -> 'a flow) -> 'a flow -> 'a flow
similar to while_do, the flow elements are send to f before cont test.
val farm : ?par:int ->
?size:(int -> int) ->
?path:('a -> int) ->
(int -> 'a flow -> 'b flow) -> 'a flow -> 'b flow
farm is a task farm generator. farm ~par:n ~size:s ~path:p gen will produce a super flow processor of type 'a flow -> 'b flow constructed from n sub flow processor of type 'a flow -> 'b flow generated by applying gen to a series of id 0, 1, ..., n-1. The elements of the input flow of the result processor will be feed to corresponding sub flow processors according to function p, which maps an element to the id of the destination sub flow processor. The size (capacity) function s maps an id of sub flow processor to the maximum number of elements that can run concurrently inside that processor. The output flows of these sub flow processors are collected and merged into the output flow of the super flow processor. The merging mechanics will first try to pick element from the next nearest (counting from the processor from which the last element is collected, in ascending order) flow processor having reached its capacity, if there's no such processor, it will pick element from the next nearest flow processor with active element inside. The default value of par and size are both 1, the default path value is a round-robin starts from 0. In this case, the evaluation is always strict.

Shorthand operator

Useful in pointfree style programming, the implementation is self-documented.

val (|>) : 'a -> ('a -> 'b) -> 'b
val (@.) : ('a -> 'b) -> 'a -> 'b
val (|-) : ('a -> 'b) -> ('b -> 'c) -> 'a -> 'c
val (-|) : ('a -> 'b) -> ('c -> 'a) -> 'c -> 'b
val (//) : ('a -> 'b) -> ('c -> 'd) -> 'a * 'c -> 'b * 'd
val curry : ('a * 'b -> 'c) -> 'a -> 'b -> 'c
val uncurry : ('a -> 'b -> 'c) -> 'a * 'b -> 'c
val id : 'a -> 'a