go - How do I implement session timeout in kafka between 2 apis - Stack Overflow
I have 2 apis with 2 topics. Api 1 produces to one topic and the other api consumes and this api produces a response to the other topic and api 1 consumes. I want to put a timeout between the producing and consuming of the message on api 1 such that after 15mins of producing a message to the topic from api 1, it should consume it within this time.
I tried using a different topic but I don't want to.
I have 2 apis with 2 topics. Api 1 produces to one topic and the other api consumes and this api produces a response to the other topic and api 1 consumes. I want to put a timeout between the producing and consuming of the message on api 1 such that after 15mins of producing a message to the topic from api 1, it should consume it within this time.
I tried using a different topic but I don't want to.
Share Improve this question asked 15 hours ago Kushal ReddyKushal Reddy 111 bronze badge New contributor Kushal Reddy is a new contributor to this site. Take care in asking for clarification, commenting, and answering. Check out our Code of Conduct.1 Answer
Reset to default -1You can achieve this by leveraging Kafka's message headers or a state management approach combined with a timeout mechanism. I put full steps, and codes that I have have as an exanple.
Step 1: When API 1 produces a message to the Kafka topic, include a timestamp in the message headers or the payload to indicate when the message was created.
(producer/api1) import ( "time" "github.com/confluentinc/confluent-kafka-go/kafka" )
producer, _ := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost:9092"})
defer producer.Close()
message := &kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topicName, Partition: kafka.PartitionAny},
Value: []byte("your-message-payload"),
Headers: []kafka.Header{{Key: "timestamp", Value: []byte(time.Now().Format(time.RFC3339))}},
}
_ = producer.Produce(message, nil)
Step 2: When API 1 consumes the response, validate the elapsed time using the timestamp from the message header. If the message is older than 15 minutes, discard or handle it as timed out.
import (
"time"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
consumer, _ := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": "localhost:9092",
"group.id": "api1-group",
"auto.offset.reset": "earliest",
})
defer consumer.Close()
_ = consumer.SubscribeTopics([]string{responseTopicName}, nil)
for {
msg, err := consumer.ReadMessage(-1)
if err != nil {
// Handle consumer error
continue
}
var msgTimestamp time.Time
for _, header := range msg.Headers {
if header.Key == "timestamp" {
msgTimestamp, _ = time.Parse(time.RFC3339, string(header.Value))
break
}
}
if time.Since(msgTimestamp) > 15*time.Minute {
// Handle timeout logic
continue
}
processMessage(msg.Value)
}
Step 3: To ensure API 2 does not produce stale responses, validate the timestamp from the incoming message before producing a response.
import (
"time"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
func produceResponse(message *kafka.Message) {
var msgTimestamp time.Time
for _, header := range message.Headers {
if header.Key == "timestamp" {
msgTimestamp, _ = time.Parse(time.RFC3339, string(header.Value))
break
}
}
if time.Since(msgTimestamp) > 15*time.Minute {
// Skip processing and respond with an error or discard
return
}
// Produce valid response
producer, _ := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost:9092"})
defer producer.Close()
response := &kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &responseTopicName, Partition: kafka.PartitionAny},
Value: []byte("response-payload"),
Headers: message.Headers,
}
_ = producer.Produce(response, nil)
}
- 谷歌强推安卓8.0系统:明年所有APP都必须支持
- 人工智能的温度:AI究竟会带给我们怎样的世界?
- 京东1.7亿美元投资金蝶原因:布局企业ERP市场
- Power apps - Multiple Sharepoint lists combine as collection and display in one gallery - Stack Overflow
- docker compose - Issues with Using CapSolver Extension in Selenium Grid for reCaptcha v2 - Stack Overflow
- Upgrading apache spark core from 3.3.2 to >=3.4.4 results in stackoverflowerror in logging - Stack Overflow
- google apps script - How do I make it so specific cells get deleted or set to a specific # each time the workbook is opened? the
- node.js - Mongoose schema worked with the main db but not with test db - Stack Overflow
- python - NaN values in Pandas are not being filled by the interpolate function when it's applied to a full dataframe - S
- typescript - Issues with generic type narrowing on merged records - Stack Overflow
- python - DocTR wrong number detection - Stack Overflow
- ruby on rails - ActiveRecord not getting id after save - Stack Overflow
- android - Unity Ads fail to initialize - Stack Overflow
- Any way to open an SSIS project with a newer version of Visual Studio? - Stack Overflow
- python - Tkinter: custom frame widget overrides columnconfigure - Stack Overflow
- amazon web services - Python boto3: download files from s3 to local only if there are differences between s3 files and local one
- node.js - Response body is null even though server is sending a non null response - Stack Overflow