?????????????ubuntu 15.10 64λ
????cpu:inter core i7-4790 3.60GHZ * 8
???????:16GB
???????:ssd 120GB
?????????????rabbmitmq 3.6.0   kafka0.8.1  (???????????????)
????PS: ?????????????????????????????????????????
????????????
????kafka :???????: 37??586 /s  ???????: 448??753 /s
????rabbitmq: ???????: 20??807 /s  ???????  16.413 /s
????????????
????rabbitmq ????4???????????????????????????????????1???????????????1?????????????????1????????????????
????rabbitmq ??????????в?С?????(? 1/20)?????????????????????“tcp????????”
?????????????κ?????
?????????
??????????????kafka?????????rabbitmq????????????????????????2?????????????Э????????????????????????????в????rabbitmq???????Щ????????????????????????????罻?????????????????kafka???????????в??????????????????????????????????????Щ???????????????????
????????
kafka??
package main
import (
"github.com/Shopify/sarama"
"os"
"os/signal"
"sync"
"log"
"time"
)
func main() {
go producer()
//    go consumer()
time.Sleep(10*time.Minute)
}
func producer()  {
config :=sarama.NewConfig()
config.Producer.Return.Successes = true
proder??err := sarama.NewAsyncProducer([]string{"localhost:9092"}??config)
if err != nil {
panic(err)
}
signals :=make(chan  os.Signal??1)
signal.Notify(signals??os.Interrupt)
var (
wg                          sync.WaitGroup
enqueued?? successes?? errors int
)
wg.Add(1)
go func() {
defer  wg.Done()
for _=range proder.Successes(){
successes++
}
}()
wg.Add(1)
go func() {
defer wg.Done()
for err := range proder.Errors(){
log.Println(err)
errors++
}
}()
go func() {
t1 := time.NewTicker(time.Second)
for{
<- t1.C
log.Println(enqueued)
}
}()
ProducerLoop:
for{
message :=&sarama.ProducerMessage{Topic:"test"??Value:sarama.StringEncoder("testing 123")}
select {
case proder.Input() <- message:
enqueued++
case <- signals:
proder.AsyncClose()
break ProducerLoop
}
}
wg.Wait()
log.Println("Successfully produced:%d;errors:%d "??successes??errors)
}
func consumer()  {
coner??err := sarama.NewConsumer([]string{"localhost:9092"}??nil)
if err != nil {
panic(err)
}
defer func() {
if err :=coner.Close(); err !=nil{
log.Fatalln(err)
}
}()
partitionConsumer ??err := coner.ConsumePartition("test"??0??sarama.OffsetNewest)
if err != nil {
panic(err)
}
defer func() {
if err := partitionConsumer.Close();err!=nil{
log.Fatalln(err)
}
}()
signals := make(chan os.Signal??1)
signal.Notify(signals??os.Interrupt)
consumed:=0
go func() {
t1 := time.NewTicker(time.Second)
for{
<- t1.C
log.Println(consumed)
}
}()
ConsumerLoop:
for{
select {
case _ = <-partitionConsumer.Messages():
consumed++
//            log.Println( string(msg.Value)??"  =>  "??consumed)
case <-signals:
break ConsumerLoop
}
}
log.Printf("Consumed: %d "?? consumed)
}