Files
kafka/producer/main.go
2023-11-28 17:15:23 +03:00

47 lines
847 B
Go
Executable File

package main
import (
"context"
"crypto/rand"
"flag"
"log"
"strconv"
"github.com/segmentio/kafka-go"
)
var async = flag.Bool("a", false, "use async")
func main() {
flag.Parse()
ctx := context.Background()
logger := log.Default()
writer := kafka.NewWriter(kafka.WriterConfig{
Brokers: []string{"127.0.0.1:29092", "127.0.0.1:39092", "127.0.0.1:49092"},
Topic: "demo",
Async: *async,
Logger: kafka.LoggerFunc(logger.Printf),
ErrorLogger: kafka.LoggerFunc(logger.Printf),
BatchSize: 2000,
// CompressionCodec: &compress.Lz4Codec,
Balancer: &SimpleBalancer{},
})
defer writer.Close()
m := make([]byte, 500)
rand.Read(m)
for i := 0; i < 524_288; i++ {
err := writer.WriteMessages(ctx, kafka.Message{Key: []byte(strconv.Itoa(i)), Value: m})
if err != nil {
log.Fatal(err)
}
}
}