一:介绍
1.介绍
默认情况是,Spout每获取一条数据,封装后发送给后面的组件,不再管后面是否处理完成或成功接收,不再考虑。
这种的情况是不用太精确,没有启用可靠性消息机制。
2.方面的体现
spout组件
bolt组件
ack机制
二:Spout组件
1.发送Tuple,给每个tuple设置一个msgId(用来标识、追踪)
重载emit()方法。
2.使用内存缓存已经发射过的tuple
首先新建缓存
在open中初始化缓存
保存到缓存
3.ack方法(成功)
如果消息发送成功后的处理
4.fail的处理
在conf(map类型)中设置最大的重发次数,方面后面的进行获取
在spout类中的初始化中获得次数
新建已经重发的次数的内存(在初始化的时候初始化即可)
fail方法
这里一开始有一个hasSendTuples.containKey(msgId)的判断,因为后面后面需要在这个Map中取值,所以需要先判断。
三:测试spout
1.超时时间的设置
设置的是1秒。
意思是1秒内后面没有处理,就是说明发送失败。
2.测试结果
因为在Bolt没有处理成功的Tuple,所以,在这里显示的只会是失败的返回信息。
四:Bolt设置
1.确认ack
2.构造ack树(锚定)
就是一个Tuple衍生的Tuple都成功,才表示这个tuple被处理成功。
接受过来的input,与将要发送的new Values(word)绑定起来。
五:测试
1.超时时间设定
使用默认时间,30秒
2.结果
六:Acker组件
1.介绍
这个组件与Spout与Bolt同级。
在使用消息可靠性保证的时候,的确认组件。
也是使用executor来进行执行。
2.注意
当启用消息可靠性保障机制的时候,运行性能明显下降。
调整acker组件运行的executor线程数量来优化。
config.setNumAckers(4);
这样就是每个Spout或者Bolt一个execute、
但是具体是几个,需要进行测试。