10. Scala中的一步操作
在scala中可以方便的实现异步操作,这里是通过Future来实现的,和java中的Future很相似,但是功能更加强大。
定义返回Future的方法
下面我们看下如何定义一个返回Future的方法:
println("Step 1: Define a method which returns a Future")
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
def donutStock(donut: String): Future[Int] = Future {
// assume some long running database operation
println("checking donut stock")
10
}
注意这里需要引入scala.concurrent.ExecutionContext.Implicits.global, 它会提供一个默认的线程池来异步执行Future。
阻塞方式获取Future的值
println("\nStep 2: Call method which returns a Future")
import scala.concurrent.Await
import scala.concurrent.duration._
val vanillaDonutStock = Await.result(donutStock("vanilla donut"), 5 seconds)
println(s"Stock of vanilla donut = $vanillaDonutStock")
donutStock() 是异步执行的,我们可以使用Await.result() 来阻塞主线程来等待donutStock()的执行结果。
下面是其输出:
Step 2: Call method which returns a Future
checking donut stock
Stock of vanilla donut = 10
非阻塞方式获取Future的值
我们可以使用Future.onComplete() 回调来实现非阻塞的通知:
println("\nStep 2: Non blocking future result")
import scala.util.{Failure, Success}
donutStock("vanilla donut").onComplete {
case Success(stock) => println(s"Stock for vanilla donut = $stock")
case Failure(e) => println(s"Failed to find vanilla donut stock, exception = $e")
}
Thread.sleep(3000)
Future.onComplete()
有两种可能情况,Success 或者 Failure,需要引入: import scala.util.{Failure, Success}
。
Future链
有时候我们需要在获得一个Future之后再继续对其进行操作,有点类似于java中的管道,下面看一个例子:
println("\nStep 2: Define another method which returns a Future")
def buyDonuts(quantity: Int): Future[Boolean] = Future {
println(s"buying $quantity donuts")
true
}
上面我们又定义了一个方法,用来接收donutStock()的返回值,然后再返回一个Future[Boolean]
。
我们看下使用flatmap该怎么链接他们:
println("\nStep 3: Chaining Futures using flatMap")
val buyingDonuts: Future[Boolean] = donutStock("plain donut").flatMap(qty => buyDonuts(qty))
import scala.concurrent.Await
import scala.concurrent.duration._
val isSuccess = Await.result(buyingDonuts, 5 seconds)
println(s"Buying vanilla donut was successful = $isSuccess")
同样的,我们还可以使用for语句来进行链接:
println("\nStep 3: Chaining Futures using for comprehension")
for {
stock <- donutStock("vanilla donut")
isSuccess <- buyDonuts(stock)
} yield println(s"Buying vanilla donut was successful = $isSuccess")
Thread.sleep(3000)
flatmap VS map
map就是对集合中的元素进行重映射,而flatmap则会将返回的值拆散然后重新组合。 下面举个直观的例子:
val buyingDonuts: Future[Boolean] = donutStock("plain donut").flatMap(qty => buyDonuts(qty))
flatMap返回的值是Future[Boolean]
。
val buyingDonuts: Future[Future[Boolean]] = donutStock("plain donut").Map(qty => buyDonuts(qty))
map返回的值是Future[Future[Boolean]]
。
Future.sequence() VS Future.traverse()
如果我们有很多个Future,然后想让他们并行执行,则可以使用 Future.sequence() 。
println(s"\nStep 2: Create a List of future operations")
val futureOperations = List(
donutStock("vanilla donut"),
donutStock("plain donut"),
donutStock("chocolate donut")
)
println(s"\nStep 5: Call Future.sequence to run the future operations in parallel")
val futureSequenceResults = Future.sequence(futureOperations)
futureSequenceResults.onComplete {
case Success(results) => println(s"Results $results")
case Failure(e) => println(s"Error processing future operations, error = ${e.getMessage}")
}
Future.traverse() 和Future.sequence() 类似, 唯一不同的是,Future.traverse()可以对要执行的Future进行操作,如下所示:
println(s"\nStep 3: Call Future.traverse to convert all Option of Int into Int")
val futureTraverseResult = Future.traverse(futureOperations){ futureSomeQty =>
futureSomeQty.map(someQty => someQty.getOrElse(0))
}
futureTraverseResult.onComplete {
case Success(results) => println(s"Results $results")
case Failure(e) => println(s"Error processing future operations, error = ${e.getMessage}")
}