December 1, 2024 Go Telegram Gemini Ai Chatbot Routing Multimodal
Building a Telegram bot powered by Google’s Gemini AI that can handle text conversations, web searches, YouTube videos, image generation, voice input, and voice output. We’ll explore an intelligent routing architecture that efficiently directs requests to specialized models.
Modern AI assistants need to handle diverse user requests: answering questions, searching the web, generating images, processing voice messages, and more. Instead of using a single large model for everything, we can implement an intelligent routing system that identifies the request type and routes it to specialized handlers.
The challenge with monolithic AI systems is that they often waste computational resources. A simple “Hello, how are you?” doesn’t need the same processing power as a complex web search query or image generation request. By implementing intelligent routing, we can optimize both cost and response time while maintaining high-quality responses for all request types.
The core idea is a two-stage routing system:
This approach optimizes costs and latency by only invoking specialized models when needed. For simple chat requests, we skip the second stage entirely.
The routing model acts as an intelligent dispatcher. It analyzes the user’s intent using natural language understanding and determines the most appropriate handler. This decision-making process happens quickly using a lightweight model (like Gemini Flash), which keeps the overhead minimal. Once the routing decision is made, the system can either respond directly (for chat) or invoke specialized services that are optimized for specific tasks.
The beauty of this architecture lies in its efficiency. Consider a user asking “What’s the weather today?” — the router identifies this as a search request, routes it to a Gemini model with Google Search enabled, and returns current information. Meanwhile, a conversational question like “Explain quantum computing” goes directly to the chat model without any search overhead.
graph TD
A[User Message] --> B[Telegram Bot]
B --> C{Router Model}
C -->|Chat| D[Chat Response]
C -->|Search| E[Google Search]
C -->|YouTube| F[YouTube Handler]
C -->|Image| G[Image Generation]
C -->|Voice Input| H[Voice Transcription]
C -->|Voice Output| I[Text-to-Speech]
E --> J[Specialized Model]
F --> J
G --> J
H --> J
I --> J
J --> K[Formatted Response]
K --> B
D --> B
B --> L[User]
Let’s start by examining the project structure:
gemini-telegram-chat/
├── cmd/
│ └── bot/
│ └── main.go
├── internal/
│ ├── bot/
│ │ ├── handler.go
│ │ └── router.go
│ ├── gemini/
│ │ ├── client.go
│ │ ├── router.go
│ │ └── models.go
│ ├── services/
│ │ ├── search.go
│ │ ├── youtube.go
│ │ ├── image.go
│ │ ├── voice.go
│ │ └── tts.go
│ └── config/
│ └── config.go
├── go.mod
└── go.sum
Understanding the components is crucial for building this system. Each component has a specific role in the routing and processing pipeline. Let’s examine them one by one, starting with the fundamental data structures that define how requests flow through our system.
The routing system needs a clear taxonomy of request types. This classification enables the router to make informed decisions about which handler to invoke. First, let’s define the request types our router can identify:
// internal/gemini/models.go
package gemini
type RequestType string
const (
RequestTypeChat RequestType = "chat"
RequestTypeSearch RequestType = "search"
RequestTypeYouTube RequestType = "youtube"
RequestTypeImage RequestType = "image"
RequestTypeVoiceIn RequestType = "voice_input"
RequestTypeVoiceOut RequestType = "voice_output"
)
type RoutingDecision struct {
Type RequestType `json:"type"`
Query string `json:"query"`
Confidence float64 `json:"confidence"`
Parameters map[string]interface{} `json:"parameters,omitempty"`
}
The router is the heart of our intelligent routing system. It uses Gemini’s structured output capabilities to classify requests. This is where the magic happens — the router analyzes natural language input and transforms it into structured routing decisions.
The router uses Gemini Flash, a fast and efficient model perfect for classification tasks. We configure it with low temperature (0.1) to ensure consistent, deterministic routing decisions. The structured output format (JSON) allows us to reliably parse the routing decision and extract parameters needed by specialized handlers.
The routing prompt is carefully crafted to help the model understand context. For example, if a user sends a voice message, we include that information in the prompt so the router can make appropriate decisions. The prompt also includes examples and clear instructions about what each request type means, helping the model make accurate classifications.
// internal/gemini/router.go
package gemini
import (
"context"
"encoding/json"
"fmt"
"github.com/google/generative-ai-go/genai"
"google.golang.org/api/option"
)
type Router struct {
client *genai.Client
model *genai.GenerativeModel
}
func NewRouter(apiKey string) (*Router, error) {
ctx := context.Background()
client, err := genai.NewClient(ctx, option.WithAPIKey(apiKey))
if err != nil {
return nil, fmt.Errorf("failed to create Gemini client: %w", err)
}
model := client.GenerativeModel("gemini-1.5-flash")
// Configure for structured output
model.GenerationConfig = &genai.GenerationConfig{
Temperature: 0.1, // Low temperature for consistent classification
ResponseMIMEType: "application/json",
}
return &Router{
client: client,
model: model,
}, nil
}
func (r *Router) Route(ctx context.Context, userMessage string, hasVoice bool) (*RoutingDecision, error) {
prompt := r.buildRoutingPrompt(userMessage, hasVoice)
resp, err := r.model.GenerateContent(ctx, genai.Text(prompt))
if err != nil {
return nil, fmt.Errorf("failed to generate routing decision: %w", err)
}
if len(resp.Candidates) == 0 || len(resp.Candidates[0].Content.Parts) == 0 {
return nil, fmt.Errorf("empty response from router")
}
// Extract JSON from response
jsonStr := fmt.Sprintf("%v", resp.Candidates[0].Content.Parts[0])
var decision RoutingDecision
if err := json.Unmarshal([]byte(jsonStr), &decision); err != nil {
return nil, fmt.Errorf("failed to parse routing decision: %w", err)
}
return &decision, nil
}
func (r *Router) buildRoutingPrompt(userMessage string, hasVoice bool) string {
voiceHint := ""
if hasVoice {
voiceHint = "The user sent a voice message. "
}
return fmt.Sprintf(`Analyze the following user message and determine the request type.
%s
User message: "%s"
Classify the request into one of these types:
- "chat": General conversation, questions, explanations
- "search": Requests for current information, news, facts that need web search
- "youtube": Requests to watch, find, or get information about YouTube videos
- "image": Requests to generate, create, or draw images
- "voice_input": User wants to send voice messages (only if explicitly requested)
- "voice_output": User wants to receive voice responses (only if explicitly requested)
Return a JSON object with this structure:
{
"type": "one of the types above",
"query": "extracted search query or main intent",
"confidence": 0.0-1.0,
"parameters": {}
}
For YouTube requests, include video_id or search_query in parameters.
For image requests, include style, size, or other generation parameters.
For voice requests, include language preferences.
Response:`, voiceHint, userMessage)
}
func (r *Router) Close() error {
return r.client.Close()
}
While the router uses a lightweight model for classification, specialized tasks often require more powerful models. For these cases, we create a flexible client that can work with different Gemini models depending on the task complexity.
The client provides a clean interface for generating text responses. It can work with context data, which is particularly useful when we need to synthesize information from multiple sources. For example, when handling search requests, we first perform a web search, then use this client to generate a comprehensive answer based on the search results.
The temperature setting (0.7) is higher than the router’s, allowing for more creative and varied responses. This is appropriate for conversational tasks where we want natural, engaging dialogue rather than deterministic classification.
// internal/gemini/client.go
package gemini
import (
"context"
"fmt"
"github.com/google/generative-ai-go/genai"
"google.golang.org/api/option"
)
type Client struct {
client *genai.Client
model *genai.GenerativeModel
}
func NewClient(apiKey, modelName string) (*Client, error) {
ctx := context.Background()
client, err := genai.NewClient(ctx, option.WithAPIKey(apiKey))
if err != nil {
return nil, fmt.Errorf("failed to create Gemini client: %w", err)
}
model := client.GenerativeModel(modelName)
model.GenerationConfig = &genai.GenerationConfig{
Temperature: 0.7,
}
return &Client{
client: client,
model: model,
}, nil
}
func (c *Client) GenerateText(ctx context.Context, prompt string) (string, error) {
resp, err := c.model.GenerateContent(ctx, genai.Text(prompt))
if err != nil {
return "", fmt.Errorf("failed to generate content: %w", err)
}
if len(resp.Candidates) == 0 || len(resp.Candidates[0].Content.Parts) == 0 {
return "", fmt.Errorf("empty response")
}
return fmt.Sprintf("%v", resp.Candidates[0].Content.Parts[0]), nil
}
func (c *Client) GenerateWithContext(ctx context.Context, prompt string, contextData string) (string, error) {
fullPrompt := fmt.Sprintf("Context:\n%s\n\nUser request:\n%s", contextData, prompt)
return c.GenerateText(ctx, fullPrompt)
}
func (c *Client) Close() error {
return c.client.Close()
}
Now we’ll dive into the specialized services that handle different types of requests. Each service is designed to be independent and focused on a single responsibility. This modular approach makes the system easier to maintain, test, and extend.
One of the most powerful features of Gemini models is their ability to access real-time information through Google Search. Instead of making separate API calls to Google’s Custom Search API, we can leverage Gemini models that have Google Search enabled. This approach is more efficient and provides better integration with the AI’s understanding capabilities.
Gemini models with Google Search enabled can automatically search the web when they detect that a query requires current information. The model intelligently decides when to search, what to search for, and how to synthesize the results into a coherent answer. This eliminates the need for manual search query construction and result parsing.
When the router identifies a search request, we use a specialized Gemini model (like gemini-1.5-pro or gemini-1.5-flash-latest) that has Google Search capabilities enabled. The model handles the entire search process internally, making API calls to Google Search as needed, and returns a synthesized answer based on the search results.
// internal/services/search.go
package services
import (
"context"
"fmt"
"github.com/google/generative-ai-go/genai"
"google.golang.org/api/option"
)
type SearchService struct {
client *genai.Client
model *genai.GenerativeModel
}
func NewSearchService(apiKey string) (*SearchService, error) {
ctx := context.Background()
client, err := genai.NewClient(ctx, option.WithAPIKey(apiKey))
if err != nil {
return nil, fmt.Errorf("failed to create Gemini client: %w", err)
}
// Use a Gemini model with Google Search enabled
// Models like gemini-1.5-pro-latest or gemini-1.5-flash-latest
// have built-in Google Search capabilities through grounding
model := client.GenerativeModel("gemini-1.5-pro-latest")
// Configure generation settings
model.GenerationConfig = &genai.GenerationConfig{
Temperature: 0.7,
}
// Enable Google Search grounding
// This allows the model to search the web when needed
model.Tools = []*genai.Tool{
{GoogleSearchRetrieval: &genai.GoogleSearchRetrieval{}},
}
return &SearchService{
client: client,
model: model,
}, nil
}
func (s *SearchService) Search(ctx context.Context, query string) (string, error) {
// Build a prompt that encourages the model to search
prompt := fmt.Sprintf(`Answer the following question using current information from the web.
If you need to search for current information, use Google Search to find the most up-to-date results.
Question: %s
Provide a comprehensive answer based on the search results. Include relevant details and cite sources when possible.`, query)
resp, err := s.model.GenerateContent(ctx, genai.Text(prompt))
if err != nil {
return "", fmt.Errorf("failed to generate search response: %w", err)
}
if len(resp.Candidates) == 0 || len(resp.Candidates[0].Content.Parts) == 0 {
return "", fmt.Errorf("empty response from search")
}
// Extract the response text
responseText := fmt.Sprintf("%v", resp.Candidates[0].Content.Parts[0])
// Check if the model used Google Search (grounding metadata)
// The GroundingMetadata contains information about which sources were used
// and can include citations that you can append to the response
if resp.Candidates[0].GroundingMetadata != nil {
groundingMeta := resp.Candidates[0].GroundingMetadata
// You can access grounding chunks and web search citations here
// For example: groundingMeta.GroundingChunks contains search results
// This allows you to provide source attribution to users
}
return responseText, nil
}
func (s *SearchService) Close() error {
return s.client.Close()
}
YouTube integration allows users to search for videos directly through the bot. This is particularly useful when users want to find tutorials, music, or any video content. The YouTube Data API v3 provides comprehensive search capabilities, allowing us to find videos by keywords, filter by various criteria, and retrieve detailed metadata.
The service handles the complexity of YouTube’s API, including authentication, request formatting, and response parsing. It returns structured video information including titles, channel names, thumbnails, and direct links. This information is then formatted in a user-friendly way for Telegram, making it easy for users to discover and access video content.
// internal/services/youtube.go
package services
import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"time"
)
type YouTubeService struct {
apiKey string
client *http.Client
}
func NewYouTubeService(apiKey string) *YouTubeService {
return &YouTubeService{
apiKey: apiKey,
client: &http.Client{
Timeout: 10 * time.Second,
},
}
}
type YouTubeVideo struct {
VideoID string `json:"videoId"`
Title string `json:"title"`
Channel string `json:"channelTitle"`
Thumbnail string `json:"thumbnail"`
Duration string `json:"duration"`
}
type YouTubeSearchResponse struct {
Items []struct {
ID struct {
VideoID string `json:"videoId"`
} `json:"id"`
Snippet struct {
Title string `json:"title"`
ChannelName string `json:"channelTitle"`
Thumbnails struct {
Default struct {
URL string `json:"url"`
} `json:"default"`
} `json:"thumbnails"`
} `json:"snippet"`
} `json:"items"`
}
func (y *YouTubeService) SearchVideos(ctx context.Context, query string, maxResults int) ([]YouTubeVideo, error) {
baseURL := "https://www.googleapis.com/youtube/v3/search"
params := url.Values{}
params.Set("key", y.apiKey)
params.Set("part", "snippet")
params.Set("q", query)
params.Set("type", "video")
params.Set("maxResults", fmt.Sprintf("%d", maxResults))
reqURL := fmt.Sprintf("%s?%s", baseURL, params.Encode())
req, err := http.NewRequestWithContext(ctx, "GET", reqURL, nil)
if err != nil {
return nil, fmt.Errorf("failed to create request: %w", err)
}
resp, err := y.client.Do(req)
if err != nil {
return nil, fmt.Errorf("failed to execute request: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return nil, fmt.Errorf("YouTube API error: %s - %s", resp.Status, string(body))
}
var searchResp YouTubeSearchResponse
if err := json.NewDecoder(resp.Body).Decode(&searchResp); err != nil {
return nil, fmt.Errorf("failed to decode response: %w", err)
}
videos := make([]YouTubeVideo, 0, len(searchResp.Items))
for _, item := range searchResp.Items {
videos = append(videos, YouTubeVideo{
VideoID: item.ID.VideoID,
Title: item.Snippet.Title,
Channel: item.Snippet.ChannelName,
Thumbnail: item.Snippet.Thumbnails.Default.URL,
})
}
return videos, nil
}
func (y *YouTubeService) FormatVideos(videos []YouTubeVideo) string {
if len(videos) == 0 {
return "No videos found."
}
formatted := "📺 YouTube Videos:\n\n"
for i, video := range videos {
videoURL := fmt.Sprintf("https://www.youtube.com/watch?v=%s", video.VideoID)
formatted += fmt.Sprintf("%d. **%s**\n", i+1, video.Title)
formatted += fmt.Sprintf(" Channel: %s\n", video.Channel)
formatted += fmt.Sprintf(" %s\n\n", videoURL)
}
return formatted
}
Image generation is one of the more complex features, requiring integration with Google’s Imagen models. When users request image generation, the router extracts the image prompt and any style parameters, then passes them to the image generation service.
The service needs to handle various parameters like style preferences, image dimensions, and artistic direction. While the implementation shown here is conceptual (as Imagen API integration details may vary), it demonstrates the pattern for handling image generation requests. In a production system, you would integrate with the actual Imagen API or use Gemini’s image generation capabilities if available.
The generated images are saved temporarily and sent to the user via Telegram’s photo or document sending capabilities. Proper cleanup of temporary files is essential to prevent disk space issues.
// internal/services/image.go
package services
import (
"context"
"fmt"
"github.com/google/generative-ai-go/genai"
"google.golang.org/api/option"
)
type ImageService struct {
client *genai.Client
model *genai.GenerativeModel
}
func NewImageService(apiKey string) (*ImageService, error) {
ctx := context.Background()
client, err := genai.NewClient(ctx, option.WithAPIKey(apiKey))
if err != nil {
return nil, fmt.Errorf("failed to create Gemini client: %w", err)
}
// Use Imagen 3 for image generation
model := client.GenerativeModel("imagen-3")
return &ImageService{
client: client,
model: model,
}, nil
}
func (i *ImageService) GenerateImage(ctx context.Context, prompt string, params map[string]interface{}) ([]byte, error) {
// Note: Actual implementation depends on Imagen API
// This is a conceptual implementation
// Build enhanced prompt with parameters
enhancedPrompt := i.buildPrompt(prompt, params)
// Generate image using Imagen
// The actual API call would go here
// For now, we'll return a placeholder
return nil, fmt.Errorf("image generation not fully implemented - requires Imagen API setup")
}
func (i *ImageService) buildPrompt(basePrompt string, params map[string]interface{}) string {
enhanced := basePrompt
if style, ok := params["style"].(string); ok {
enhanced = fmt.Sprintf("%s, style: %s", enhanced, style)
}
if size, ok := params["size"].(string); ok {
enhanced = fmt.Sprintf("%s, size: %s", enhanced, size)
}
return enhanced
}
func (i *ImageService) Close() error {
return i.client.Close()
}
Voice input processing transforms spoken messages into text that can be processed by the AI system. Telegram sends voice messages in OGG format, which needs to be converted to a format compatible with Google Cloud Speech-to-Text API.
The conversion process uses ffmpeg to transform the audio file into the required format (16kHz, mono, WAV). This preprocessing step is crucial because the Speech API has specific requirements for audio encoding. Once converted, the audio data is sent to Google Cloud Speech-to-Text, which uses advanced machine learning models to transcribe the speech accurately.
The transcription service supports multiple languages and can handle various audio qualities. It returns the transcribed text, which is then processed as if it were a regular text message. This allows users to interact with the bot naturally using voice, making the experience more accessible and convenient.
// internal/services/voice.go
package services
import (
"context"
"fmt"
"io"
"os"
"os/exec"
"path/filepath"
"time"
"cloud.google.com/go/speech/apiv1"
"cloud.google.com/go/speech/apiv1/speechpb"
"google.golang.org/api/option"
)
type VoiceService struct {
client *speech.Client
}
func NewVoiceService(apiKey string) (*VoiceService, error) {
ctx := context.Background()
client, err := speech.NewClient(ctx, option.WithAPIKey(apiKey))
if err != nil {
return nil, fmt.Errorf("failed to create Speech client: %w", err)
}
return &VoiceService{
client: client,
}, nil
}
func (v *VoiceService) TranscribeAudio(ctx context.Context, audioData []byte, languageCode string) (string, error) {
// Create recognition config
config := &speechpb.RecognitionConfig{
Encoding: speechpb.RecognitionConfig_LINEAR16,
SampleRateHertz: 16000,
LanguageCode: languageCode,
AudioChannelCount: 1,
}
audio := &speechpb.RecognitionAudio{
AudioSource: &speechpb.RecognitionAudio_Content{Content: audioData},
}
// Perform recognition
resp, err := v.client.Recognize(ctx, &speechpb.RecognizeRequest{
Config: config,
Audio: audio,
})
if err != nil {
return "", fmt.Errorf("failed to recognize speech: %w", err)
}
if len(resp.Results) == 0 {
return "", fmt.Errorf("no transcription results")
}
// Combine all alternatives
var transcript string
for _, result := range resp.Results {
for _, alt := range result.Alternatives {
transcript += alt.Transcript + " "
}
}
return transcript, nil
}
func (v *VoiceService) TranscribeTelegramVoice(ctx context.Context, voiceFilePath string) (string, error) {
// Convert OGG to WAV using ffmpeg
wavPath := filepath.Join(os.TempDir(), fmt.Sprintf("voice_%d.wav", time.Now().Unix()))
defer os.Remove(wavPath)
cmd := exec.CommandContext(ctx, "ffmpeg", "-i", voiceFilePath, "-ar", "16000", "-ac", "1", "-f", "wav", wavPath)
if err := cmd.Run(); err != nil {
return "", fmt.Errorf("failed to convert audio: %w", err)
}
// Read WAV file
audioData, err := os.ReadFile(wavPath)
if err != nil {
return "", fmt.Errorf("failed to read audio file: %w", err)
}
return v.TranscribeAudio(ctx, audioData, "en-US")
}
func (v *VoiceService) Close() error {
return v.client.Close()
}
Text-to-speech (TTS) enables the bot to respond with voice messages, creating a more natural conversational experience. Google Cloud Text-to-Speech API provides high-quality neural voices that sound natural and expressive.
The TTS service converts text responses into audio files. Users can request voice responses explicitly, or the system can be configured to automatically use voice for certain types of responses. The service supports multiple languages and voice types, allowing for customization based on user preferences or context.
The generated audio is saved as an OGG Opus file, which is Telegram’s preferred format for voice messages. This ensures compatibility and optimal quality. The file is sent to the user and then cleaned up to manage storage efficiently.
// internal/services/tts.go
package services
import (
"context"
"fmt"
"io"
"os"
"path/filepath"
"time"
"cloud.google.com/go/texttospeech/apiv1"
"cloud.google.com/go/texttospeech/apiv1/texttospeechpb"
"google.golang.org/api/option"
)
type TTSService struct {
client *texttospeech.Client
}
func NewTTSService(apiKey string) (*TTSService, error) {
ctx := context.Background()
client, err := texttospeech.NewClient(ctx, option.WithAPIKey(apiKey))
if err != nil {
return nil, fmt.Errorf("failed to create TTS client: %w", err)
}
return &TTSService{
client: client,
}, nil
}
func (t *TTSService) SynthesizeSpeech(ctx context.Context, text string, languageCode string, voiceName string) ([]byte, error) {
req := &texttospeechpb.SynthesizeSpeechRequest{
Input: &texttospeechpb.SynthesisInput{
InputSource: &texttospeechpb.SynthesisInput_Text{Text: text},
},
Voice: &texttospeechpb.VoiceSelectionParams{
LanguageCode: languageCode,
Name: voiceName,
SsmlGender: texttospeechpb.SsmlVoiceGender_NEUTRAL,
},
AudioConfig: &texttospeechpb.AudioConfig{
AudioEncoding: texttospeechpb.AudioEncoding_OGG_OPUS,
SampleRateHertz: 24000,
},
}
resp, err := t.client.SynthesizeSpeech(ctx, req)
if err != nil {
return nil, fmt.Errorf("failed to synthesize speech: %w", err)
}
return resp.AudioContent, nil
}
func (t *TTSService) SynthesizeToFile(ctx context.Context, text string, languageCode string, voiceName string) (string, error) {
audioData, err := t.SynthesizeSpeech(ctx, text, languageCode, voiceName)
if err != nil {
return "", err
}
// Save to temporary file
outputPath := filepath.Join(os.TempDir(), fmt.Sprintf("tts_%d.ogg", time.Now().Unix()))
if err := os.WriteFile(outputPath, audioData, 0644); err != nil {
return "", fmt.Errorf("failed to write audio file: %w", err)
}
return outputPath, nil
}
func (t *TTSService) Close() error {
return t.client.Close()
}
The bot handler is the orchestrator that ties all components together. It receives updates from Telegram, processes them through the routing system, and coordinates the appropriate services to generate responses. This is where the routing architecture comes to life.
The handler manages the lifecycle of all services, ensuring they’re properly initialized and cleaned up. It handles different message types (text, voice, images) and routes them through the appropriate processing pipeline. Error handling is crucial here, as the handler needs to gracefully handle failures in any service and provide meaningful feedback to users.
The handler also manages context propagation, which is essential for request cancellation and timeout handling. When a user sends a message, the handler creates a context that flows through all service calls, allowing the system to cancel operations if needed (for example, if the user sends another message before the first one completes).
// internal/bot/handler.go
package bot
import (
"context"
"fmt"
"io"
"log"
"os"
"path/filepath"
"github.com/go-telegram-bot-api/telegram-bot-api/v5"
"your-project/internal/gemini"
"your-project/internal/services"
)
type Handler struct {
bot *tgbotapi.BotAPI
router *gemini.Router
geminiClient *gemini.Client
searchService *services.SearchService
youtubeService *services.YouTubeService
imageService *services.ImageService
voiceService *services.VoiceService
ttsService *services.TTSService
}
func NewHandler(botToken string, config *Config) (*Handler, error) {
bot, err := tgbotapi.NewBotAPI(botToken)
if err != nil {
return nil, fmt.Errorf("failed to create bot: %w", err)
}
router, err := gemini.NewRouter(config.GeminiAPIKey)
if err != nil {
return nil, fmt.Errorf("failed to create router: %w", err)
}
geminiClient, err := gemini.NewClient(config.GeminiAPIKey, "gemini-1.5-pro")
if err != nil {
return nil, fmt.Errorf("failed to create Gemini client: %w", err)
}
searchService, err := services.NewSearchService(config.GeminiAPIKey)
if err != nil {
return nil, fmt.Errorf("failed to create search service: %w", err)
}
youtubeService := services.NewYouTubeService(config.YouTubeAPIKey)
imageService, err := services.NewImageService(config.GeminiAPIKey)
if err != nil {
log.Printf("Warning: Image service not available: %v", err)
}
voiceService, err := services.NewVoiceService(config.GoogleCloudAPIKey)
if err != nil {
log.Printf("Warning: Voice service not available: %v", err)
}
ttsService, err := services.NewTTSService(config.GoogleCloudAPIKey)
if err != nil {
log.Printf("Warning: TTS service not available: %v", err)
}
return &Handler{
bot: bot,
router: router,
geminiClient: geminiClient,
searchService: searchService,
youtubeService: youtubeService,
imageService: imageService,
voiceService: voiceService,
ttsService: ttsService,
}, nil
}
func (h *Handler) HandleUpdate(ctx context.Context, update tgbotapi.Update) {
if update.Message == nil {
return
}
msg := update.Message
chatID := msg.Chat.ID
// Handle voice messages
if msg.Voice != nil {
h.handleVoiceMessage(ctx, msg)
return
}
// Handle text messages
if msg.Text == "" {
return
}
userMessage := msg.Text
log.Printf("Received message from %d: %s", chatID, userMessage)
// Step 1: Route the request
decision, err := h.router.Route(ctx, userMessage, false)
if err != nil {
h.sendMessage(chatID, "Sorry, I encountered an error processing your request.")
log.Printf("Routing error: %v", err)
return
}
log.Printf("Routing decision: type=%s, confidence=%.2f", decision.Type, decision.Confidence)
// Step 2: Process based on routing decision
var response string
var voiceFile string
switch decision.Type {
case gemini.RequestTypeChat:
// Direct chat - no specialized model needed
response, err = h.geminiClient.GenerateText(ctx, userMessage)
case gemini.RequestTypeSearch:
response, err = h.handleSearch(ctx, decision.Query)
case gemini.RequestTypeYouTube:
response, err = h.handleYouTube(ctx, decision)
case gemini.RequestTypeImage:
response, voiceFile, err = h.handleImageGeneration(ctx, decision)
case gemini.RequestTypeVoiceOut:
response, voiceFile, err = h.handleVoiceOutput(ctx, userMessage)
default:
response, err = h.geminiClient.GenerateText(ctx, userMessage)
}
if err != nil {
h.sendMessage(chatID, fmt.Sprintf("Error: %v", err))
log.Printf("Processing error: %v", err)
return
}
// Send response
if voiceFile != "" {
h.sendVoice(chatID, voiceFile)
os.Remove(voiceFile) // Cleanup
} else {
h.sendMessage(chatID, response)
}
}
func (h *Handler) handleVoiceMessage(ctx context.Context, msg *tgbotapi.Message) {
chatID := msg.Chat.ID
// Download voice file
file, err := h.bot.GetFile(tgbotapi.FileConfig{FileID: msg.Voice.FileID})
if err != nil {
h.sendMessage(chatID, "Failed to download voice message.")
return
}
voicePath := filepath.Join(os.TempDir(), fmt.Sprintf("voice_%s.ogg", msg.Voice.FileID))
if err := h.downloadFile(file.FilePath, voicePath); err != nil {
h.sendMessage(chatID, "Failed to download voice message.")
return
}
defer os.Remove(voicePath)
// Transcribe voice
transcript, err := h.voiceService.TranscribeTelegramVoice(ctx, voicePath)
if err != nil {
h.sendMessage(chatID, "Failed to transcribe voice message.")
return
}
h.sendMessage(chatID, fmt.Sprintf("Transcribed: %s", transcript))
// Process transcribed text
decision, err := h.router.Route(ctx, transcript, true)
if err != nil {
return
}
// Continue processing as text message...
}
func (h *Handler) handleSearch(ctx context.Context, query string) (string, error) {
// Use Gemini model with Google Search enabled
// The model automatically searches the web and synthesizes the answer
answer, err := h.searchService.Search(ctx, query)
if err != nil {
return "", fmt.Errorf("search failed: %w", err)
}
return answer, nil
}
func (h *Handler) handleYouTube(ctx context.Context, decision *gemini.RoutingDecision) (string, error) {
query := decision.Query
if searchQuery, ok := decision.Parameters["search_query"].(string); ok {
query = searchQuery
}
videos, err := h.youtubeService.SearchVideos(ctx, query, 5)
if err != nil {
return "", fmt.Errorf("YouTube search failed: %w", err)
}
return h.youtubeService.FormatVideos(videos), nil
}
func (h *Handler) handleImageGeneration(ctx context.Context, decision *gemini.RoutingDecision) (string, string, error) {
prompt := decision.Query
if imgPrompt, ok := decision.Parameters["image_prompt"].(string); ok {
prompt = imgPrompt
}
imageData, err := h.imageService.GenerateImage(ctx, prompt, decision.Parameters)
if err != nil {
return "", "", err
}
// Save image temporarily
imagePath := filepath.Join(os.TempDir(), fmt.Sprintf("image_%d.png", ctx.Value("timestamp")))
if err := os.WriteFile(imagePath, imageData, 0644); err != nil {
return "", "", err
}
return "Here's your generated image:", imagePath, nil
}
func (h *Handler) handleVoiceOutput(ctx context.Context, text string) (string, string, error) {
audioPath, err := h.ttsService.SynthesizeToFile(ctx, text, "en-US", "en-US-Neural2-F")
if err != nil {
return "", "", err
}
return "", audioPath, nil
}
func (h *Handler) sendMessage(chatID int64, text string) {
msg := tgbotapi.NewMessage(chatID, text)
msg.ParseMode = "Markdown"
h.bot.Send(msg)
}
func (h *Handler) sendVoice(chatID int64, filePath string) {
voice := tgbotapi.NewVoice(chatID, tgbotapi.FilePath(filePath))
h.bot.Send(voice)
}
func (h *Handler) downloadFile(filePath, destPath string) error {
url := fmt.Sprintf("https://api.telegram.org/file/bot%s/%s", h.bot.Token, filePath)
resp, err := h.bot.Client.Get(url)
if err != nil {
return err
}
defer resp.Body.Close()
out, err := os.Create(destPath)
if err != nil {
return err
}
defer out.Close()
_, err = io.Copy(out, resp.Body)
return err
}
type Config struct {
GeminiAPIKey string
YouTubeAPIKey string
GoogleCloudAPIKey string
}
Here’s a detailed flow diagram showing how requests are processed:
sequenceDiagram
participant U as User
participant T as Telegram Bot
participant R as Router Model
participant G as Gemini Client
participant S as Search Service
participant Y as YouTube Service
participant I as Image Service
participant V as Voice Service
participant TT as TTS Service
U->>T: Send Message
T->>R: Route Request
R->>R: Analyze Intent
R-->>T: Routing Decision (JSON)
alt Chat Request
T->>G: Generate Response
G-->>T: Text Response
T->>U: Send Message
else Search Request
T->>S: Search Query
S->>S: Use Google Search (built-in)
S-->>T: Synthesized Answer
T->>U: Send Message
else YouTube Request
T->>Y: Search Videos
Y-->>T: Video List
T->>U: Send Video Links
else Image Request
T->>I: Generate Image
I-->>T: Image Data
T->>U: Send Image
else Voice Input
T->>V: Transcribe Audio
V-->>T: Transcript
T->>R: Route Transcript
Note over T,R: Continue as text message
else Voice Output
T->>G: Generate Text
G-->>T: Text Response
T->>TT: Synthesize Speech
TT-->>T: Audio File
T->>U: Send Voice Message
end
// cmd/bot/main.go
package main
import (
"context"
"log"
"os"
"os/signal"
"syscall"
"your-project/internal/bot"
)
func main() {
config := &bot.Config{
GeminiAPIKey: os.Getenv("GEMINI_API_KEY"),
YouTubeAPIKey: os.Getenv("YOUTUBE_API_KEY"),
GoogleCloudAPIKey: os.Getenv("GOOGLE_CLOUD_API_KEY"),
}
handler, err := bot.NewHandler(os.Getenv("TELEGRAM_BOT_TOKEN"), config)
if err != nil {
log.Fatalf("Failed to create handler: %v", err)
}
defer handler.Close()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Setup graceful shutdown
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
// Start bot
u := tgbotapi.NewUpdate(0)
u.Timeout = 60
updates := handler.bot.GetUpdatesChan(u)
go func() {
for update := range updates {
handler.HandleUpdate(ctx, update)
}
}()
log.Println("Bot started. Press Ctrl+C to stop.")
<-sigChan
log.Println("Shutting down...")
}
The routing architecture provides several significant advantages over monolithic approaches:
Cost Optimization: Simple chat requests don’t invoke expensive specialized models. By using a lightweight router model (Gemini Flash) for classification and only invoking specialized models when needed, we significantly reduce API costs. A typical chat request might cost a fraction of a cent, while a search request with Google Search enabled might cost more. By routing intelligently, we ensure users only pay for the capabilities they actually use.
Latency Reduction: Direct chat responses are faster without routing overhead. When a user asks a simple question, the router quickly identifies it as a chat request and responds directly. There’s no need to wait for search APIs or other services. This creates a snappy, responsive user experience.
Scalability: Each service can be scaled independently. If search requests become more frequent, you can scale up the search service without affecting chat handling. This modular approach allows for fine-grained resource management and cost control.
Maintainability: Clear separation of concerns makes the codebase easier to maintain. Each service has a single responsibility, making it easier to debug issues, add features, and test components in isolation. When a bug appears in image generation, you know exactly where to look.
Extensibility: New request types can be added by extending the router. Want to add support for code execution or database queries? Simply add a new request type to the router, implement the handler, and the system automatically routes requests to it. This makes the system future-proof and adaptable to new requirements.
Resource Efficiency: By using Gemini’s built-in Google Search capability instead of separate API calls, we reduce the number of external service dependencies and simplify the architecture. The model handles search internally, making API calls as needed, and synthesizes results intelligently.
Configuration management is straightforward with environment variables. This approach keeps sensitive credentials out of the codebase and makes deployment easier across different environments.
Create a .env file:
TELEGRAM_BOT_TOKEN=your_telegram_bot_token
GEMINI_API_KEY=your_gemini_api_key
YOUTUBE_API_KEY=your_youtube_api_key
GOOGLE_CLOUD_API_KEY=your_google_cloud_api_key
Note that we no longer need separate Google Search API credentials since we’re using Gemini’s built-in search capability. The Gemini API key is sufficient for both chat and search functionality when using models with Google Search enabled.
To get your API keys:
This architecture demonstrates how to build a sophisticated Telegram bot with intelligent routing. The two-stage routing system efficiently handles diverse request types while optimizing for cost and performance. By separating concerns and using specialized services, we create a maintainable and extensible solution.
The routing model acts as a smart dispatcher, ensuring each request is handled by the most appropriate service, while simple chat requests bypass unnecessary processing steps entirely. The use of Gemini’s built-in Google Search capability simplifies the architecture by eliminating the need for separate search API calls and result parsing.
The modular design allows each component to evolve independently. As new Gemini models become available or new capabilities are added, you can update individual services without affecting the entire system. This future-proofs your bot and makes it easier to adopt new technologies as they emerge.
Building a bot with this architecture requires careful consideration of error handling, resource management, and user experience. Each service should gracefully handle failures, and the handler should provide clear feedback to users when things go wrong. Proper cleanup of temporary files and efficient use of API quotas are also important considerations for production deployments.
The combination of intelligent routing, specialized services, and Gemini’s powerful capabilities creates a bot that feels natural and responsive while being cost-effective and maintainable. Whether users are chatting, searching the web, watching videos, generating images, or using voice features, the routing system ensures they get the best possible experience.