开发者

generator/block to iterator/stream conversion

开发者 https://www.devze.com 2023-01-17 10:18 出处:网络
Basically I want to convert this: def data(block: T => Unit) to a Stream (dataToStream is a hypothetical function that do this conversion):

Basically I want to convert this:

def data(block: T => Unit)

to a Stream (dataToStream is a hypothetical function that do this conversion):

val dataStream: Stream[T] = dataToStream(data)

I suppose this problem could be resolved by continuations:

// let's assume that we don't know how data is implemented
// we just know that it generates integers
def data(block: Int => Unit) { for (i <- 0 to 10) block(i) }

// here we can print all data integers
data { i => println(i) }

// >> but what we really want is to convert data to the stream <<

// very dumb solution is to collect all data into a list
var dataList = List[Int]()
data { i => dataList = i::dataList }
// and make a stream from it
dataList.toStream

// but we want to make a lazy, CPU and memory efficient stream or iterator from data
val dataStream: Stream[Int] = dataToStream(data)
dataStream.foreach { i => println(i) }

// and here a black magic of continuations must be used
// fo开发者_运维问答r me this magic is too hard to understand
// Does anybody know how dataToStream function could look like?

Thanks, Dawid


EDITED: Modified the examples to show the laziness of traversable.view

scala> def data(f : Int => Unit) = for(i <- 1 to 10) {    
     |   println("Generating " + i)
     |   f(i)
     | }
data: (f: (Int) => Unit)Unit

scala> def toTraversable[T]( func : (T => Unit) => Unit) = new Traversable[T] {
     |   def foreach[X]( f : T => X) = func(f(_) : Unit)                       
     | }                                                                       
toTraversable: [T](func: ((T) => Unit) => Unit)java.lang.Object with Traversable[T]

The toTraversable method will convert your data function into a Traversable collection. By itself, it's nothing huge, but you can convert this to a TraversableView which is lazy. Here's an example:

scala> toTraversable(data).view.take(3).sum
Generating 1
Generating 2
Generating 3
Generating 4
res1: Int = 6

The unfortunate nature of the take method is that it must go one past the last value generated to work correctly, but it will terminate early. The above code would look the same without the ".view" call. However, here's a more compelling example:

scala> toTraversable(data).view.take(2).foreach(println)
Generating 1
1
Generating 2
2
Generating 3

So in conclusion, I believe the collection you're looking for is TraversableView, which is easiest to create view making a Traversable and then calling "view" on it. If you really wanted the Stream type, here's a method that works in 2.8.0.final and will make a "Stream" without threads:

scala> def dataToStream( data : (Int => Unit) => Unit) = {
     |   val x = new Traversable[Int] {                     
     |     def foreach[U](f : Int => U) = {                 
     |        data( f(_) : Unit)                            
     |     }
     |   }
     |   x.view.toList.toStream                             
     | }
dataToStream: (data: ((Int) => Unit) => Unit)scala.collection.immutable.Stream[Int]

scala> dataToStream(data)
res8: scala.collection.immutable.Stream[Int] = Stream(0, ?)

The unfortunate nature of this method is that it will iterate over the entire traversable before making the stream. This also means all the values need to be buffered in memory. The only alternative is to resort to threads.

As an aside: This was the motivating reason to prefer Traversables as direct returns from scalax.io.File methods: "lines" "chars" and "bytes".


Here's a simple solution that spawns a thread that consumes the data. It posts the data to a SynchronousQueue. A stream the pulls data from the queue is created and returned:

 def generatortostream[T](f: (T=>Unit)=>Unit): Stream[T] = {
  val queue = new java.util.concurrent.SynchronousQueue[Option[T]]
  val callbackthread = new Runnable {
    def run() { f((Some(_:T)) andThen (queue.put(_))); queue.put(None) }
  }   
  new Thread(callbackthread).start()
  Stream.continually(queue.take).takeWhile(_.isDefined).map(_.get)
}   


I still have to figure out how to do that myself. I suspect the answer lies somewhere here:

  • Jim McBeath's blog, for example Standalone Generic Scala Generator
  • This other blog - Notas van de Vos Generators in Scala

Edit: removed code that showed how to solved a different problem.

Edit2: Using the code http://gist.github.com/580157 that was initially posted http://gist.github.com/574873, you can do this:

object Main {
  import Generator._

  def data = generator[Int] { yld =>
    for (i <- suspendable(List.range(0, 11))) yld(i)
  }

  def main(args: Array[String]) {
    for( i <- data.toStream ) println(i)
  }
}

data does not take a block code, but I think this is fine because with the continuation, block can be handled by the caller. The code for Generator can be seen in the gist on github.


Here's a delimited continuations-based implementation, adapted from @Geoff Reedy's offering:

import Stream._
import scala.util.continuations._
import java.util.concurrent.SynchronousQueue

def toStream[A](data: (A=>Unit)=>Unit):Stream[A] = reset {
    val queue = new SynchronousQueue[Option[A]]
    queue.put(Some(shift { k: (A=>Unit) =>
        new Thread() { 
            override def run() {
                data(k)
                // when (if) the data source stops pumping, add None 
                // to signal that the stream is dead
                queue.put(None)
            }
        }.start()
        continually(queue.take).takeWhile(_.isDefined).map(_.get)
    })
}
0

精彩评论

暂无评论...
验证码 换一张
取 消

关注公众号