Rabbitmq??Kafka???????????
???????????? ???????[ 2016/3/9 13:17:57 ] ??????????????? ???????
?????????????ubuntu 15.10 64λ
????cpu:inter core i7-4790 3.60GHZ * 8
???????:16GB
???????:ssd 120GB
?????????????rabbmitmq 3.6.0 kafka0.8.1 (???????????????)
????PS: ?????????????????????????????????????????
????????????
????kafka :???????: 37??586 /s ???????: 448??753 /s
????rabbitmq: ???????: 20??807 /s ??????? 16.413 /s
????????????
????rabbitmq ????4???????????????????????????????????1???????????????1?????????????????1????????????????
????rabbitmq ??????????в?С?????(? 1/20)?????????????????????“tcp????????”
?????????????κ?????
?????????
??????????????kafka?????????rabbitmq????????????????????????2?????????????Э????????????????????????????в????rabbitmq???????Щ????????????????????????????罻?????????????????kafka???????????в??????????????????????????????????????Щ???????????????????
????????
kafka??
package main
import (
"github.com/Shopify/sarama"
"os"
"os/signal"
"sync"
"log"
"time"
)
func main() {
go producer()
// go consumer()
time.Sleep(10*time.Minute)
}
func producer() {
config :=sarama.NewConfig()
config.Producer.Return.Successes = true
proder??err := sarama.NewAsyncProducer([]string{"localhost:9092"}??config)
if err != nil {
panic(err)
}
signals :=make(chan os.Signal??1)
signal.Notify(signals??os.Interrupt)
var (
wg sync.WaitGroup
enqueued?? successes?? errors int
)
wg.Add(1)
go func() {
defer wg.Done()
for _=range proder.Successes(){
successes++
}
}()
wg.Add(1)
go func() {
defer wg.Done()
for err := range proder.Errors(){
log.Println(err)
errors++
}
}()
go func() {
t1 := time.NewTicker(time.Second)
for{
<- t1.C
log.Println(enqueued)
}
}()
ProducerLoop:
for{
message :=&sarama.ProducerMessage{Topic:"test"??Value:sarama.StringEncoder("testing 123")}
select {
case proder.Input() <- message:
enqueued++
case <- signals:
proder.AsyncClose()
break ProducerLoop
}
}
wg.Wait()
log.Println("Successfully produced:%d;errors:%d
"??successes??errors)
}
func consumer() {
coner??err := sarama.NewConsumer([]string{"localhost:9092"}??nil)
if err != nil {
panic(err)
}
defer func() {
if err :=coner.Close(); err !=nil{
log.Fatalln(err)
}
}()
partitionConsumer ??err := coner.ConsumePartition("test"??0??sarama.OffsetNewest)
if err != nil {
panic(err)
}
defer func() {
if err := partitionConsumer.Close();err!=nil{
log.Fatalln(err)
}
}()
signals := make(chan os.Signal??1)
signal.Notify(signals??os.Interrupt)
consumed:=0
go func() {
t1 := time.NewTicker(time.Second)
for{
<- t1.C
log.Println(consumed)
}
}()
ConsumerLoop:
for{
select {
case _ = <-partitionConsumer.Messages():
consumed++
// log.Println( string(msg.Value)??" => "??consumed)
case <-signals:
break ConsumerLoop
}
}
log.Printf("Consumed: %d
"?? consumed)
}
??????
???·???
??????????????????
2023/3/23 14:23:39???д?ò??????????
2023/3/22 16:17:39????????????????????Щ??
2022/6/14 16:14:27??????????????????????????
2021/10/18 15:37:44???????????????
2021/9/17 15:19:29???·???????·
2021/9/14 15:42:25?????????????
2021/5/28 17:25:47??????APP??????????
2021/5/8 17:01:11