`
spartan1
  • 浏览: 360455 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
社区版块
存档分类
最新评论

akka源码分析

 
阅读更多

看akka源码的一些体会,没有列出源码来。akka代码主要包括两块:底层分发(akka.dispatch包)和上层模型(akka.actor包),从底层线程调度(dispatch)往上看起

 

函数式语言主要处理表达式求值,面向对象语言主要处理对象间消息发送消息。

 

 

1. 底层线程调度

 

Doug Lea: ForkJoinTask

ForkJoinTask是用少数线程执行海量独立任务的极好架构,这里的独立任务指的是任务和任务之间不要有共享数据,否则会有并发访问的问题。

ForkJoinTask的实现包括三个类:

ForkJoinPool: 实现了ExecutorService,提供execute、submit等线程池基本方法,池中的线程都是ForkJoinWorkerThread;

ForkJoinWorkerThread: 继承自Thread,包含了自己的ForkJoin任务队列,在处理完自己任务队列中任务的时候,可以从其他Worker的队列中

偷任务来执行;

ForkJoinTask: 实现了Future接口,可以直接作为ForkJoinPool.submit的返回值,提供的fork方法将自己放到当前Worker线程的任务队列中,

join方法让当前线程等待任务完成,或者通过偷过来等方式自己执行该任务

 

为了性能考虑,这三个类紧耦合,存在大量互相访问成员属性的情况,Doug Lea老先生说,这种比较ugly的实现,能让性能提高四倍,可以每秒

处理10亿级别的ForkJoin任务。

为了处理并发,大量使用了sun.misc.Unsafe类中提供的直接对内存的CAS(compare and swap)原子操作,为了解决可能的乱序执行

导致的问题,整个代码中都充斥着在if条件判断中对变量赋值的操作,感觉就是在看C代码。

 

ForkJoinTask是多核单进程版本的MapReduceJob。

 

 

2. 上层actor模型

 

Actor是用户态定义的类型,用户能够看到的Actor都是从这个类型来的。用户能看到的actor是trait akka.actor.Actor,这个只是actor对外的

一个门面,actor要访问actor系统内部的功能,基本上都要通过ActorContext来访问。

 

ActorCell是actor的内部表示,实现了ActorContext这个trait,所有的功能基本上都是在ActorCell提供的。ActorCell占用64字节。

 

ActorContext是从actor的角度看到的ActorCell的视图,提供了设置接收超时、自身引用、become/unbecome、获取sender引用、

获取children引用列表、获取MessageDispatcher、获取ActorSystem、获取parent引用、watch/unwatch一个actor的方法,

因ActorContext继承了ActorRefFactory,所以也有actorOf、actorFor等创建/获取actorRef的能力。

 

Actor/ActorCell和enipcore的Service/ServiceBase概念一模一样,都是一个是系统外面向用户的,一个是系统内进行调度的。

 

ActorRef是用户看到的对Actor的引用,任何对actor的访问,都是通过ActorRef来的。ActorRef提供了获取path、tell/forward消息的功能

 

实际上内部是使用一个InternalActorRef来表示ActorRef的,InternalActorRef继承自ActorRef,提供了Actor生命周期管理的接口。

LocalActorRef实现了InternalActorRef,是本节点中真正的actorRef实现,其中会创建并启动ActorCell。

 

ActorSystem在创建时,LocalActorRefProvider会创建rootGuardian(根actor),然后rootGuardian下会创建面向用户态actor的

guardian,这两个都是InternalActorRef,是通过直接new LocalActorRef创建出来的,这两个guardian的Actor类都是Guardian。

 

在actor内部创建子actor时,执行的是context.actorOf方法,context实际上就是ActorCell,ActorCell.actorOf调用了

LocalActorRefProvider.actorOf方法,直接new一个LocalActorRef出来,而新创建的LocalActorRef会创建ActorCell,并调用其

start方法,ActorCell.start方法中,将创建mailbox,并向mailbox中发送一个Create系统消息,然后让dispatcher开始调度mailbox

 

执行ActorSystem.actorOf方法创建actor时,实际上向guardian这个Actor发送CreateChild消息,让它创建一个actor。guardian在

收到CreateChild消息时,调用context.actorOf方法创建新actor,这个就与在actor内部创建子actor的做法一样了。

 

 

3. Actor模型和线程模型如何结合

 

MessageQueue实现了入队列enqueue(receiver:ActorRef, handle: Envelope),出队列dequeue():Envelope

SystemMessageQueue提供了systemEnqueue(receiver:ActorRef, message: SystemMessage),全部出队列systemDrain():SystemMessage方法。

其中,Envelope封装了message:Any和sender:ActorRef两个成员,而SystemMessage实际上是一个LinkedList,包含了所有的系统消息。

 

MailBox继承自系统消息队列SystemMessageQueue,实现了Runnable接口,同时包含了一个ActorCell成员,一个MessageQueue成员

MailBox代理了MessageQueue的所有方法。MessageQueue的具体类型,根据MailBoxType的不同而不同,比如UnboundedMailbox将创建ConcurrentLinkedQueue

 

Dispatchers根据ID生成Dispatcher,ActorSystem中有一个默认的dispatcher,dispatcher底层有executor,executor有两种ForkJoinExecutor和

ThreadPoolExecutor,默认是ForkJoinExecutor。

 

另外,scala中的val都是在对象初始化时就执行的

 

3.1 在创建ActorSystem时,初始化默认的dispatcher,使用默认的ForkJoinPool(ExecutorService)

3.2 在使用actorRef ! Message发送消息时,调用了actorRef对应的actorCell.tell方法,其中调用了dispatcher.dispatch方法

    dispatch(akka/dispatch/Dispather.scala)中做了两件事:

      一是将消息放到actorCell的消息队列中(mbox.enqueue(receiver.self, invocation))

      二是调用dispather底层的线程池executor.execute(mbox)(registerForExecution(mbox, true, false))执行mbox.run()方法

        而mbox.run()中,将先从SystemMessage链表中处理系统消息,然后从MessageQueue成员中处理用户消息。处理系统消息时,

        调用actorCell.systemInvoke方法,将所有的系统消息顺序全部处理完;处理用户消息时,调用actorCell.invoke方法,根据dispatcher

        的throughput决定本次处理多少条消息,根据dispatcher的throughputDeadlineTime决定本次处理多长时间,时间长度在处理

        完一条消息后检查一次。

 

    对于ForkJoinPool这种executor,每次执行execute(mbox)时,实际上都是先创建一个继承自ForkJoinTask的MailboxExecutionTask,

    其中的exec方法调用mbox.run方法,因此每次执行都会创建一个ForkJoinTask对象。

 

    还有一点,消息队列都是放到actor对应的mailbox中(以Envelope的形式封装消息本身和sender),而执行的task对象会放到Executor的

    每个线程对应的工作队列中,task和消息分别使用不同的队列。

 

 

4. 定时处理

 

actorSystem在初始化时,会创建scheduler。scheduler内部维护HashedWheelTimer定时器,schedular提供schedule、scheduleOnce等方法,

可以在指定时间之后执行一个task,或者向某个actor发送一个消息。执行task时,使用system.dispatcher执行。

 

schedule主要在状态机FSM、actor.receive接收超时中使用。actor.receive中使用时,首先实现actor.preStart方法,其中调用setReceiveTimeout设置超时时间,在每个receive方法中,需要能够处理ReceiveTimeout事件,如果需要再次超时时,需要再次设置超时事件。只有receive处理完了所有的事件并且设置了超时事件后,超时才会被再次设置

 

内部实现上,actorCell通过调用checkReceiveTimeout方法调用系统scheduler设置一个一次性的超时事件。在actorCell处理Create系统消息时,创建了actor后,首先调用其actor.preStart方法,然后执行checkReceiveTimeout判断是否设置超时。

 

 

5. FSM的实现

 

akka提供了FSM的实现,该实现基于actor模型,提供了状态与状态数据定义、超时等一系列状态机相关的模型和方法

 

 

6. akka如何与耗时系统进行交互,即akka如何与外部系统进行适配(待续)

 

 

7. 在play中的应用(待续)

 

总结:

akka中重点的类都在akka.actor和akka.dispatch两个包中。前者提供了actor模型的抽象和语义,后者提供了底层执行机制。

ActorSystem是系统的控制中心,这里汇聚了用于线程调度的dispatcher,用于定时处理的scheduler,用于创建actor的provider。

dispatcher提供了dispatch/dispatchSystem/execute等多种执行轻量级任务的方法

 

akka中,还有监控(supervise)、Promise/Future、与外部系统交互、Patterns、路由还没有看,暂时不看了。


分享到:
评论
1 楼 rhtwj6231 2013-05-28  
你好,请问有没方法获取MailBox中的队列长度呢?或者通过ActorCell是否可以获取?

相关推荐

    Scala Akka项目源码

    Scala Akka项目源码

    akka-streams-kafka-examples-源码.rar

    akka-streams-kafka-examples-源码.rar

    Learning Akka(PACKT,2015)

    Akka is a distributed computing toolkit that enables developers to build correct concurrent and distributed applications using Java and Scala with ease, applications that scale across servers and ...

    java8集合源码分析-akka-comparison:阿卡比较

    集合源码分析 java 实例来自from Oracle官方并发教程 Thread Objects 线程对象 #####定义并启动一个线程 任务:创建线程打印hello world com.oracle.sec2.thread_objects.HelloThread 定义并启动一个Actor也是很简单...

    Akka.in.Action.2016.9.pdf

    Akka in Action shows you how to build message-oriented systems with Akka. This comprehensive, hands-on tutorial introduces each concept with a working example. You’ll start with the big picture of ...

    AKKA 本质 《Akka Essentials》

    Akka Essentials,学习akka很好的一本书

    akkajava源码-ParallelAndConcurrentCodes:各种并行计算任务的源代码

    akka java源码并行与并发 包含各种并行和多线程概念的源代码,包括: 线程数 固有锁(监控器) 等待,通知 执行者 可调用项和将来的任务 并发集合 原子引用和整数 Akka框架

    Akka 实战 akka in action v13 2014版本

    akka 实战。akka in action。v13 2014新版。 互联网技术入门必备 清晰,非扫描。

    akka-kryo-serialization, 基于Kryo的Akka序列化.zip

    akka-kryo-serialization, 基于Kryo的Akka序列化 akka-kryo-serialization-- Scala 和Akka基于kryo的序列化程序这个库为 Scala 和Akka提供定制的基于kryo的序列化程序。 它可以用于更高效的akka远程处理。它还可以...

    Akka 基础学习pdf中文文档

    如何使用 Akka 来构建具备高容错性、可以横向扩展的分布式网络应用程序。Akka 是一 个强大的工具集,提供了很多选项,可以对在本地机器上处理或网络远程机器上处理的 某项工作进行抽象封装,使之对开发者不可见。...

    Learning Akka

    Learning Akka Learning Akka Learning AkkaLearning Akka

    akka java实现tcp远程调用

    akka实例 java实现tcp远程调用,一个服务端,一个客户端

    Akka入门与实践

    如何使用 Akka 来构建具备高容错性、可以横向扩展的分布式网络应用程序。Akka 是一 个强大的工具集,提供了很多选项,可以对在本地机器上处理或网络远程机器上处理的 某项工作进行抽象封装,使之对开发者不可见。...

    akka实例参考

    初学akka使用实例,有很好的帮助啊,可实际运行

    Akka-in-Action.pdf

    In March 2010 I noticed a tweet by Dean Wampler that made me look into Akka: W00t! RT @jboner: #akka 0.7 is released: http://bit.ly/9yRGSB After some investigation into the source code and building a ...

    akka-data-replication, 在Akka集群中,复制 CRDTs.zip

    akka-data-replication, 在Akka集群中,复制 CRDTs Akka分布式数据这个库的( akka-data-replication ) 已经包含在Akka中,在模块中分发数据。英镑不在/akka-data-replication中维护。 所有 Bug 修复和新功能将在 ...

    akkajava.pdf

    Akka is here to change that. Using the Actor Model we raise the abstraction level and provide a better platform to build scalable, resilient and responsive applications—see the Reactive Manifesto ...

    akka-actor-2.11-2.5.19-API文档-中文版.zip

    赠送jar包:akka-actor_2.11-2.5.19.jar; 赠送原API文档:akka-actor_2.11-2.5.19-javadoc.jar; 赠送源代码:akka-actor_2.11-2.5.19-sources.jar; 赠送Maven依赖信息文件:akka-actor_2.11-2.5.19.pom; 包含...

    Akka Concurrency

    Akka Concurrency

    akka学习入门实践

    akka学习入门实践

Global site tag (gtag.js) - Google Analytics