go - How do I implement session timeout in kafka between 2 apis - Stack Overflow

时间: 2025-01-06 admin 业界

I have 2 apis with 2 topics. Api 1 produces to one topic and the other api consumes and this api produces a response to the other topic and api 1 consumes. I want to put a timeout between the producing and consuming of the message on api 1 such that after 15mins of producing a message to the topic from api 1, it should consume it within this time.

I tried using a different topic but I don't want to.

I have 2 apis with 2 topics. Api 1 produces to one topic and the other api consumes and this api produces a response to the other topic and api 1 consumes. I want to put a timeout between the producing and consuming of the message on api 1 such that after 15mins of producing a message to the topic from api 1, it should consume it within this time.

I tried using a different topic but I don't want to.

Share Improve this question asked 15 hours ago Kushal ReddyKushal Reddy 111 bronze badge New contributor Kushal Reddy is a new contributor to this site. Take care in asking for clarification, commenting, and answering. Check out our Code of Conduct.
Add a comment  | 

1 Answer 1

Reset to default -1

You can achieve this by leveraging Kafka's message headers or a state management approach combined with a timeout mechanism. I put full steps, and codes that I have have as an exanple.

Step 1: When API 1 produces a message to the Kafka topic, include a timestamp in the message headers or the payload to indicate when the message was created.

(producer/api1) import ( "time" "github.com/confluentinc/confluent-kafka-go/kafka" )

producer, _ := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost:9092"})
defer producer.Close()

message := &kafka.Message{
    TopicPartition: kafka.TopicPartition{Topic: &topicName, Partition: kafka.PartitionAny},
    Value:          []byte("your-message-payload"),
    Headers:        []kafka.Header{{Key: "timestamp", Value: []byte(time.Now().Format(time.RFC3339))}},
}

_ = producer.Produce(message, nil)

Step 2: When API 1 consumes the response, validate the elapsed time using the timestamp from the message header. If the message is older than 15 minutes, discard or handle it as timed out.

import (
    "time"
    "github.com/confluentinc/confluent-kafka-go/kafka"
)

consumer, _ := kafka.NewConsumer(&kafka.ConfigMap{
    "bootstrap.servers": "localhost:9092",
    "group.id":          "api1-group",
    "auto.offset.reset": "earliest",
})
defer consumer.Close()

_ = consumer.SubscribeTopics([]string{responseTopicName}, nil)

for {
    msg, err := consumer.ReadMessage(-1)
    if err != nil {
        // Handle consumer error
        continue
    }

    
    var msgTimestamp time.Time
    for _, header := range msg.Headers {
        if header.Key == "timestamp" {
            msgTimestamp, _ = time.Parse(time.RFC3339, string(header.Value))
            break
        }
    }

    if time.Since(msgTimestamp) > 15*time.Minute {
        // Handle timeout logic 
        continue
    }

    
    processMessage(msg.Value)
}

Step 3: To ensure API 2 does not produce stale responses, validate the timestamp from the incoming message before producing a response.

import (
    "time"
    "github.com/confluentinc/confluent-kafka-go/kafka"
)

func produceResponse(message *kafka.Message) {
    var msgTimestamp time.Time
    for _, header := range message.Headers {
        if header.Key == "timestamp" {
            msgTimestamp, _ = time.Parse(time.RFC3339, string(header.Value))
            break
        }
    }

    if time.Since(msgTimestamp) > 15*time.Minute {
        // Skip processing and respond with an error or discard
        return
    }

    // Produce valid response
    producer, _ := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost:9092"})
    defer producer.Close()

    response := &kafka.Message{
        TopicPartition: kafka.TopicPartition{Topic: &responseTopicName, Partition: kafka.PartitionAny},
        Value:          []byte("response-payload"),
        Headers:        message.Headers, 
    }

    _ = producer.Produce(response, nil)
}
最新文章