Files
kafka/consumer/main.go
2023-11-28 17:15:23 +03:00

47 lines
807 B
Go
Executable File

package main
import (
"context"
"flag"
"fmt"
"log"
"os"
"os/signal"
"syscall"
"time"
"github.com/segmentio/kafka-go"
)
var id = flag.String("id", "", "Consumer ID")
func main() {
flag.Parse()
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGTERM, os.Interrupt)
defer cancel()
go func() {
reader := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"127.0.0.1:29092"},
Topic: "demo",
GroupID: "my-group",
SessionTimeout: time.Second * 6,
})
defer reader.Close()
for {
msg, err := reader.ReadMessage(ctx)
if err != nil {
log.Fatal(err)
}
fmt.Printf("Consumer: %s Parition: %d Offset: %d Key: %s\n", *id, msg.Partition, msg.Offset, msg.Key)
time.Sleep(300 * time.Millisecond)
}
}()
<-ctx.Done()
}