// An F# implementation of Java-style Streams // F# has something equivalent called sequences, but I think Java streams // are better organized. open System open System.Collections.Generic;; type Vec<'T> = ResizeArray<'T>;; // type alias type stream<'T> = { mutable seeds : Vec<'T>; // finite portion mutable generator : stream<'T> -> 'T; // generates new element mutable is_atend : stream<'T> -> bool; // predicate for end of stream mutable counter : int; // how many generated mutable last : option<'T>; // last value generated } member this.next() = let answer = this.generator(this) this.counter <- this.counter + 1 this.last <- Some(answer); answer member this.try_next() = if this.is_atend(this) then None else Some(this.next()) // basic monadic combinators: member this.map<'U> (f: 'T -> 'U) = let vec = Vec<'U>(); for x in this.seeds do vec.Add(f x) { stream.seeds = vec; counter = this.counter; is_atend = fun x -> this.is_atend(this) generator = fun strm -> f(this.generator(this)) last = Option.map f this.last } member this.append(other:stream<'T>) = //required to define bind // the generator must call next() on two streams to advance them. // the counter of the appended stream will also be advanced. { stream.counter = this.counter; seeds = Vec<'T>() last = this.last is_atend = fun strm -> this.is_atend(this) && other.is_atend(other) generator = fun strm -> if not(this.is_atend(this)) then this.next() else other.next() } member this.bind<'U>(bf:'T -> stream<'U>) = let mutable strm = // first create empty stream let vec = Vec<'U>() { stream.counter = 0; seeds = vec; is_atend = fun s -> true generator = fun s -> vec.[0] ; // will never be called last = None; } // preload (may not terminate if inifinite stream of none objects while strm.is_atend(strm) && not(this.is_atend(this)) do strm <- bf(this.next()) { stream.counter = 0 seeds = Vec<'U>() last = None is_atend = fun s -> this.is_atend(this) && strm.is_atend(strm) generator = fun s -> // get ready for next round if not(this.is_atend(this)) then strm <- strm.append(bf(this.next())) strm.generator(strm) } member this.filter(p : 'T -> bool) = let gen = this.generator; this.generator <- fun strm -> let mutable ax = gen(this) while not(p ax) do ax <- gen(this) ax this member this.foreach (act : 'T -> unit) : unit = while not(this.is_atend(this)) do let ax = act (this.next()) if not(this.is_atend(this)) then ax member this.limit n = let atend = this.is_atend this.is_atend <- fun s -> this.counter >= n || atend(this) this member this.until (p: 'T -> bool) = let atend = this.is_atend this.is_atend <- fun s -> Option.defaultValue false (Option.map p (this.last)) || atend(this) this member this.for_all (p: 'T -> bool) = let mutable answer = true while answer && not(this.is_atend(this)) do let ax = this.next(); if not(p ax) && not(this.is_atend(this)) then answer <- false answer member this.there_exists(p: 'T -> bool) = not(this.for_all(fun x -> not(p x))) let flatten<'T> (srcstream:stream>) = let mutable strm = // first create empty stream let vec = Vec<'T>() { stream.counter = 0; seeds = vec; is_atend = fun s -> true generator = fun s -> vec.[0] ; // will never be called last = None; } // preload (may not terminate if inifinte stream of none objects while strm.is_atend(strm) && not(srcstream.is_atend(srcstream)) do strm <- srcstream.next() { stream.counter = 0 seeds = Vec<'T>() last = None is_atend = fun s -> srcstream.is_atend(srcstream) && strm.is_atend(strm) generator = fun s -> // get ready for next round if not(srcstream.is_atend(srcstream)) then strm <- strm.append(srcstream.next()) strm.generator(strm) } ///// constructors let vector_stream<'T>(vec:Vec<'T>) = let n = vec.Count let mutable internal_cx = 0; // don't use counter: filter won't work { stream.counter = 0; seeds = vec; //is_atend = fun s -> s.counter >= n; is_atend = fun s -> internal_cx>=n generator = fun s -> internal_cx <- internal_cx + 1 s.seeds.[internal_cx - 1]; last = None; };; let finite_stream<'T>(A:'T list) = let vec = Vec<'T>(); for x in A do vec.Add(x) vector_stream(vec) let strong_induction_stream<'T>(seeds1:'T list, iterator: Vec<'T> -> 'T) = let vec = Vec<'T>(); for x in seeds1 do vec.Add(x) { stream.counter = 0; seeds = vec; is_atend = fun x -> false; // default infinite stream generator = fun strm -> iterator (strm.seeds); last = None; };; let induction_stream<'T>(seed:'T,iter:'T -> 'T) = strong_induction_stream([seed], fun vec -> let v = vec.[0] vec.[0] <- iter(v) v) // create stream by providing generator let generate_stream<'T> (genfun:unit -> 'T) = { stream.counter = 0; seeds = Vec<'T>() is_atend = fun x -> false; // default infinite stream generator = fun strm -> genfun(); last = None; } let empty_stream<'T> () = { stream.seeds = Vec<'T>(); stream.counter = 0; stream.is_atend = fun x -> true; stream.generator = fun strm -> strm.seeds.[0]; // will never be called stream.last = None; } let option_stream<'T> (opt:'T option) = (Option.map (fun x -> finite_stream([x])) opt) |> Option.defaultValue(empty_stream()) let results_stream<'T,'E> (results:Result<'T,'E> list) = let rv = Vec<'T>(); for r in results do match r with | Ok x -> rv.Add(x) | _ -> () vector_stream(rv) //////////////////////////// demo let mutable x = 1; let odds = generate_stream( fun () -> let v= x in (x<-x+2; v) ).limit(10) odds.foreach (printf "%d ") printfn "" let evens = induction_stream(0, fun n -> n+2) evens .filter(fun x -> x%4=0) .map(fun x -> float(x)/2.0) .until(fun x -> x>99.0) .foreach(printf "%.2f ") printfn "" //finite_stream([2;3;5;7;11;13;17;19]).foreach(printfn "%d") let primes = Vec(); for x in [2;3;5;7;11] do primes.Add(x) let candidates = induction_stream(13, fun n -> n+2) let all_primes = candidates .filter(fun candidate -> vector_stream(primes) .until(fun p -> float(p) > 1.0+Math.Sqrt(float(candidate))) .for_all(fun p -> candidate % p <> 0)) .map(fun p -> primes.Add(p); p) .until(fun p -> p<2) // until overflow all_primes.until(fun p->p>500).foreach (printf "%d ") printfn "\ntesting bind ..." let on_behalf f = try Some(f()) with | e -> None let mutable xx = 0 let nested = generate_stream(fun () -> xx<-xx+1 let next = if xx%3=0 then None else Some(xx) option_stream(next)) //nested.limit(10).foreach(printfn "%A") let safediv a b = if b<>0 then Some(a/b) else None induction_stream(0,fun x -> x+10) .bind( fun x -> finite_stream([x;x+1;x+2]) ) .limit(20) .foreach( printf "%d " ) printfn "\nfirst test done" let nested2 = induction_stream(0,fun x -> x+10) .map( fun x -> finite_stream([x;x+1;x+2]) ) (flatten nested2) .limit(20) .foreach( printf "%d " ) printfn "\neos" let gss = generate_stream stream>(fun () -> (option_stream None)) (flatten (gss.limit(10))) .foreach(printf ":: %A") printfn "done" (* let sa = finite_stream([1;2;3]) sa.map(fun x -> finite_stream([x*10;x*10+1;x*10+2])) .limit(100) .foreach( printf "%A " ) printfn "" *) finite_stream([0;1;2;3;4;5;6;7;8;9;10]) .filter(fun x -> x%2=1) .foreach(printf "%d ") printfn ""