İçerikler

Go ile Real-Time Video Analizi ve Edge Processing

İçerikler

Go ile Real-Time Video Analizi ve Edge Processing

Özet

  • Edge Processing: Veriyi merkeze göndermeden lokal olarak işleme
  • Go + Video Pipeline: Goroutine ve channel yapısı ile yüksek performanslı video işleme
  • Production Ready: Motion detection, object detection, event publishing, monitoring
  • Maliyet Tasarrufu: Cloud processing’e göre %95+ tasarruf

Not: Bu makale, production ortamında kullanılan bir video analiz sisteminin temel bileşenlerini paylaşmaktadır. Kod örnekleri ve mimari kararlar, gerçek proje deneyimlerinden yola çıkılarak hazırlanmıştır.

1. Giriş: Video Verisi ve Edge Processing İhtiyacı

Günümüzde video, üretilen verinin en büyük bölümünü oluşturuyor. CCTV sistemleri, canlı yayın altyapıları, akıllı şehirler, endüstriyel kameralar ve IoT cihazları; gerçek zamanlı (real-time) video analizi ihtiyacını kaçınılmaz hale getiriyor.

Bu noktada iki temel problem ortaya çıkıyor:

  1. Gecikme (latency) - Veriyi merkeze göndermek zaman alır
  2. Bant genişliği ve maliyet - Ham video akışları oldukça pahalıdır

Bu problemleri çözmenin en etkili yollarından biri Edge Processing yaklaşımıdır.

Proje Hikayesi

Bu makaleyi yazma gereği, daha önce C dili ile geliştirdiğim bir video analiz sistemini Go (Golang) ile yeniden yazma deneyimimden doğdu. C ile yazılmış sistem, performans açısından mükemmel sonuçlar veriyordu ve production ortamında başarıyla çalışıyordu. Ancak, sistemin bakımı ve geliştirilmesi süreçlerinde, Go’nun sağladığı bazı avantajların bu tür bir proje için ne kadar değerli olduğunu fark ettim.

Go ile yeniden yazma sürecinde, C’de elde ettiğim performansa çok yakın, pratikte kabul edilebilir bir performans seviyesi yakaladım. Bunun yanında, goroutine ve channel yapısının video pipeline’ları için ne kadar uygun olduğunu keşfettim. Go’nun eşzamanlılık modeli, C’deki thread yönetimi ve memory management yaklaşımlarına göre daha temiz ve güvenli bir geliştirici deneyimi sunuyor. Ayrıca Go’nun cross-platform derleme yeteneği, sistemin farklı edge cihazlarında (Raspberry Pi, x86, ARM) çalışmasını çok kolaylaştırdı. GC ve runtime overhead nedeniyle teorik olarak küçük bir fark olsa da, geliştirme hızı ve bakım kolaylığı bu farkı fazlasıyla telafi etti.

Bu deneyim sonucunda, Go ile edge video processing sisteminin hem performans hem de geliştirici deneyimi açısından güçlü bir alternatif olduğunu gördüm. C’nin performans avantajları tartışılmaz, ancak Go’nun sunduğu modern dil özellikleri ve geliştirici dostu yaklaşımı, bu tür sistemler için önemli bir tercih sebebi olabilir. Bu makale, bu deneyimlerden yola çıkarak, benzer sistemler geliştirmek isteyen geliştiricilere rehberlik etmek amacıyla hazırlanmıştır.

2. Edge Processing Nedir?

Edge processing, verinin merkezî bir sunucuya gönderilmeden, üretildiği noktaya (kamera, gateway, edge node) yakın bir yerde işlenmesidir.

2.1 Avantajları

  • Düşük gecikme - Veri lokal olarak işlendiği için milisaniyeler içinde sonuç alınır
  • Daha az bant genişliği kullanımı - Sadece analiz sonuçları (event’ler) gönderilir, ham video gönderilmez
  • Daha iyi veri gizliliği - Hassas görüntüler merkeze gönderilmez
  • Ölçeklenebilir mimari - Her edge node bağımsız çalışır, merkezi sistemde tek nokta başarısızlığı olmaz
  • Maliyet etkinlik - Cloud compute maliyetleri azalır

2.2 Kullanım Senaryoları

  • Akıllı güvenlik sistemleri - Anomali tespiti, yüz tanıma
  • Endüstriyel kalite kontrolü - Üretim hattında hızlı karar verme
  • Traffic yönetimi - Araç sayımı, trafik analizi
  • Retail analytics - Müşteri davranış analizi, heat map oluşturma

3. Neden Go (Golang)?

Go, edge ve real-time sistemler için oldukça güçlü bir adaydır:

  • Yüksek performans - Native binary, düşük GC overhead, C/C++ seviyesinde hız
  • Eşzamanlılık - Goroutine & channel yapısı ile eşzamanlı video pipeline’ları kolayca yönetilir
  • Düşük bellek kullanımı - Edge cihazların kısıtlı kaynakları için ideal
  • Cross-platform derleme - Linux ARM (Raspberry Pi), x86, Windows için tek kaynak kodu
  • Kolay containerization - Docker, k3s, Kubernetes Edge (K3s) ile uyumlu
  • Statik binary - Tüm bağımlılıklar binary’de, dağıtım kolaylığı
  • Zengin standart kütüphane - net, context, sync paketleri video işleme için mükemmel

4. Genel Mimari

5. Hızlı Başlangıç Rehberi

Sistemi kurmak ve çalıştırmak için adım adım rehber:

5.1 Gereksinimler

1
2
3
4
5
6
7
8
9
# Go 1.21+ kurulu olmalı
go version

# FFmpeg kurulu olmalı
ffmpeg -version

# (Opsiyonel) OpenCV ve GoCV için
# Linux: apt-get install libopencv-dev
# macOS: brew install opencv

5.2 Proje Yapısı

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
video-processor/
├── cmd/
│   └── main.go
├── internal/
│   ├── pipeline/
│   ├── detector/
│   ├── source/
│   └── publisher/
├── config/
│   └── config.yaml
├── go.mod
└── Dockerfile

5.3 Temel Kurulum

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
# Projeyi oluştur
mkdir video-processor && cd video-processor
go mod init video-processor

# Bağımlılıkları ekle
go get github.com/eclipse/paho.mqtt.golang
go get gopkg.in/yaml.v3
go get github.com/sirupsen/logrus

# (Opsiyonel) GoCV
go get gocv.io/x/gocv

5.3.1 GoCV Kurulum Detayları ve Cross-Platform Zorlukları

GoCV kurulumu platforma göre farklılık gösterir:

Linux (Ubuntu/Debian)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
# OpenCV bağımlılıklarını kur
sudo apt-get update
sudo apt-get install -y \
    libopencv-dev \
    pkg-config \
    libavcodec-dev \
    libavformat-dev \
    libavutil-dev \
    libswscale-dev

# GoCV'yi kur
go get gocv.io/x/gocv

macOS

1
2
3
4
5
# Homebrew ile OpenCV kur
brew install opencv pkg-config

# GoCV'yi kur
go get gocv.io/x/gocv

Windows

1
2
3
4
5
6
7
8
9
# Chocolatey ile OpenCV kur (veya manuel)
choco install opencv

# Environment variable ayarla
set CGO_CPPFLAGS=-IC:\opencv\build\include
set CGO_LDFLAGS=-LC:\opencv\build\x64\vc15\lib

# GoCV'yi kur
go get gocv.io/x/gocv

Docker ile Taşınabilirlik

OpenCV bağımlılıklarını Docker ile yönetmek en kolay yöntemdir:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
# Build stage
FROM golang:1.23 AS builder

# OpenCV bağımlılıklarını kur
RUN apt-get update && apt-get install -y \
    libopencv-dev \
    pkg-config \
    && rm -rf /var/lib/apt/lists/*

WORKDIR /app
COPY go.mod go.sum ./
RUN go mod download

COPY . .
RUN CGO_ENABLED=1 go build -o video-processor ./cmd/main.go

Karşılaşılan Sorunlar ve Çözümleri:

  1. CGO hatası: CGO_ENABLED=1 olmadan derleme yapılamaz
  2. pkg-config bulunamadı: pkg-config paketinin kurulu olduğundan emin olun
  3. OpenCV versiyonu uyumsuzluğu: GoCV belirli OpenCV versiyonlarıyla çalışır, versiyon kontrolü yapın
  4. Cross-compilation zorluğu: ARM için native build yapın veya Docker kullanın

Önemli Cross-Compilation Notu (ARM / Edge Cihazlar):

  • GoCV, CGO kullandığı için x86 bir makineden doğrudan GOOS=linux GOARCH=arm64 go build ... komutuyla Raspberry Pi / Jetson gibi ARM cihazlara cross-compile etmek çoğu zaman sorunsuz çalışmaz.
  • Derleme sırasında hem Go tarafında hem de native OpenCV kütüphanelerinde doğru mimariyi hedeflemek gerekir; bu da pratikte oldukça karmaşıktır.
  • Önerilen yaklaşımlar:
    • Mümkünse ARM cihazın üzerinde native build yapın (örneğin direkt Raspberry Pi üzerinde go build).
    • Veya Docker buildx + QEMU kullanarak multi-arch Docker image üretin (amd64 host’tan arm64 image build etmek için).
    • Production senaryolarında, her mimari için ayrı build pipeline tanımlamak (amd64, armv7, arm64) çoğu zaman en sağlıklı çözümdür.

5.4 İlk Test

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// cmd/main.go
package main

import (
    "context"
    "log"
    "time"
)

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()
    
    // Test video source (dosyadan)
    source := NewVideoSource("file://test.mp4", 640, 480)
    
    if err := source.Start(ctx); err != nil {
        log.Fatal(err)
    }
    
    // İlk 10 frame'i oku
    frameCount := 0
    for frame := range source.Frames() {
        log.Printf("Frame %d alındı: %dx%d", frameCount, frame.Width, frame.Height)
        frameCount++
        if frameCount >= 10 {
            break
        }
    }
    
    source.Stop()
    log.Println("Test tamamlandı!")
}

5.5 Production’a Geçiş Checklist

Production’a geçmeden önce aşağıdaki checklist’i kontrol edin:

  • Error handling ve retry mekanizmaları eklendi
  • Logging ve monitoring kuruldu
  • Health check endpoint’leri çalışıyor
  • Configuration dosyaları hazır
  • Docker image build edildi ve test edildi
  • Load test yapıldı
  • Memory leak test edildi
  • Graceful shutdown çalışıyor
  • Alerting mekanizması kuruldu
  • Backup ve recovery planı hazır

6. Video Codec ve Format Seçimi

Edge processing için doğru codec ve format seçimi kritik öneme sahiptir. Farklı codec’lerin avantaj ve dezavantajları:

6.1 Codec Karşılaştırması

Codec Compression Ratio CPU Usage Edge Uygunluğu Önerilen Kullanım
H.264 1.0x (baseline) Düşük ⭐⭐⭐⭐⭐ Genel kullanım, en yaygın
H.265/HEVC 1.5-2.0x Orta-Yüksek ⭐⭐⭐⭐ Yüksek kalite, sınırlı bant genişliği
AV1 2.0-2.5x Çok Yüksek ⭐⭐ Gelecek için, yüksek performanslı cihazlar
MJPEG 0.3-0.5x Çok Düşük ⭐⭐⭐ Düşük latency, frame-by-frame işleme

6.2 Format Seçimi Stratejisi

Edge processing için önerilen format: RGB24 veya YUV420

RGB24:

  • ✅ Her pixel için 3 byte (R, G, B)
  • ✅ Doğrudan görüntü işleme için uygun
  • ✅ OpenCV/GoCV ile kolay entegrasyon
  • ❌ Daha büyük bellek kullanımı

YUV420:

  • ✅ Daha küçük bellek kullanımı (%50 daha az)
  • ✅ Video codec’ler için native format
  • ❌ RGB’ye dönüşüm gerekebilir
1
2
3
4
5
6
// FFmpeg'de format seçimi
// RGB24 için:
args := []string{"-pix_fmt", "rgb24"}

// YUV420 için (daha küçük):
args := []string{"-pix_fmt", "yuv420p"}

7. Video Kaynağından Frame Almak

Video kaynağından frame almak için FFmpeg’i Go uygulamamız içinde process olarak çalıştıracağız. FFmpeg, RTSP, USB kamera, dosya ve daha birçok kaynaktan video okuyabilir.

7.1 Farklı Video Kaynakları

Production ortamında farklı video kaynak türleriyle karşılaşabilirsiniz:

RTSP Stream (IP Kamera)

En yaygın kullanım senaryosu. IP kameralar genellikle RTSP protokolü üzerinden video sağlar.

USB Kamera

USB kameralar için FFmpeg’in video4linux2 (v4l2) desteğini kullanabiliriz:

1
ffmpeg -f v4l2 -i /dev/video0 -f rawvideo -pix_fmt rgb24 -s 640x480 pipe:1

Video Dosyası

Test ve geliştirme için video dosyalarından okuma:

1
ffmpeg -i test_video.mp4 -f rawvideo -pix_fmt rgb24 -s 640x480 pipe:1

HTTP/HTTPS Stream

Bazı modern kameralar HTTP üzerinden MJPEG veya HLS stream sağlar.

7.2 FFmpeg Komutu

1
2
ffmpeg -rtsp_transport tcp -i rtsp://username:password@camera-ip:554/stream \
       -f rawvideo -pix_fmt rgb24 -s 640x480 pipe:1

Bu komut:

  • -rtsp_transport tcp: RTSP bağlantısı için TCP kullanır (güvenilir)
  • -f rawvideo: Ham video formatı çıktısı
  • -pix_fmt rgb24: RGB24 pixel formatı (her pixel 3 byte)
  • -s 640x480: Çözünürlük ayarı (gereksinime göre değiştirilebilir)
  • pipe:1: Stdout’a yazar (Go’da okuyacağız)

7.3 Go ile FFmpeg Entegrasyonu

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
package main

import (
    "bufio"
    "context"
    "fmt"
    "io"
    "log"
    "os/exec"
    "sync"
    "time"
)

type Frame struct {
    Data      []byte
    Width     int
    Height    int
    Timestamp time.Time
}

type VideoSource struct {
    sourceURL string
    width     int
    height    int
    cmd       *exec.Cmd
    stdout    io.ReadCloser
    frames    chan *Frame
    errors    chan error
    wg        sync.WaitGroup
}

func NewVideoSource(sourceURL string, width, height int) *VideoSource {
    return &VideoSource{
        sourceURL: sourceURL,
        width:     width,
        height:    height,
        frames:    make(chan *Frame, 10), // Buffer 10 frames
        errors:    make(chan error, 1),
    }
}

func (vs *VideoSource) Start(ctx context.Context) error {
    // FFmpeg komutunu oluştur
    args := []string{
        "-rtsp_transport", "tcp",
        "-i", vs.sourceURL,
        "-f", "rawvideo",
        "-pix_fmt", "rgb24",
        "-s", fmt.Sprintf("%dx%d", vs.width, vs.height),
        "pipe:1",
    }
    
    vs.cmd = exec.CommandContext(ctx, "ffmpeg", args...)
    
    var err error
    vs.stdout, err = vs.cmd.StdoutPipe()
    if err != nil {
        return fmt.Errorf("stdout pipe: %w", err)
    }

    stderr, err := vs.cmd.StderrPipe()
    if err != nil {
        return fmt.Errorf("stderr pipe: %w", err)
    }

    if err := vs.cmd.Start(); err != nil {
        return fmt.Errorf("start ffmpeg: %w", err)
    }

    // FFmpeg stderr loglarını ayrı goroutine'de oku
    go func() {
        scanner := bufio.NewScanner(stderr)
        for scanner.Scan() {
            log.Printf("FFmpeg: %s", scanner.Text())
        }
    }()
    
    // Frame okuma goroutine'i
    vs.wg.Add(1)
    go vs.readFrames(ctx)
    
    return nil
}

func (vs *VideoSource) readFrames(ctx context.Context) {
    defer vs.wg.Done()
    
    frameSize := vs.width * vs.height * 3 // RGB24: her pixel 3 byte
    
    frameBuffer := make([]byte, frameSize)
    
    for {
        select {
        case <-ctx.Done():
            // Context iptal olduğunda FFmpeg process'ini de sonlandır
            if vs.cmd != nil && vs.cmd.Process != nil {
                _ = vs.cmd.Process.Kill()
            }
            return
        default:
            // Bir frame'in tamamını oku
            n, err := io.ReadFull(vs.stdout, frameBuffer)
            if err != nil {
                if err != io.EOF && err != io.ErrUnexpectedEOF {
                    select {
                    case vs.errors <- fmt.Errorf("read frame: %w", err):
                    case <-ctx.Done():
                    }
                }
                return
            }
            
            if n != frameSize {
                continue // Eksik frame, atla
            }
            
            // Yeni frame buffer'ı kopyala (goroutine safe)
            frameData := make([]byte, frameSize)
            copy(frameData, frameBuffer)
            
            frame := &Frame{
                Data:      frameData,
                Width:     vs.width,
                Height:    vs.height,
                Timestamp: time.Now(),
            }
            
            select {
            case vs.frames <- frame:
            case <-ctx.Done():
                return
            }
        }
    }
}

func (vs *VideoSource) Frames() <-chan *Frame {
    return vs.frames
}

func (vs *VideoSource) Errors() <-chan error {
    return vs.errors
}

func (vs *VideoSource) Stop() error {
    if vs.cmd != nil && vs.cmd.Process != nil {
        vs.cmd.Process.Kill()
    }
    vs.wg.Wait()
    close(vs.frames)
    close(vs.errors)
    return nil
}

8. Go ile Video Pipeline

Go’nun goroutine ve channel yapısı, video işleme pipeline’ları için mükemmeldir. Her aşama bir goroutine olarak çalışır ve channel’lar üzerinden frame’ler geçirilir.

8.1 Pipeline Mimarisi

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
package main

import (
    "context"
    "fmt"
    "sync"
    "time"
)

type Pipeline struct {
    source    *VideoSource
    processors []FrameProcessor
    wg        sync.WaitGroup
}

type FrameProcessor interface {
    Process(ctx context.Context, in <-chan *Frame, out chan<- *Frame)
    Name() string
}

func NewPipeline(source *VideoSource) *Pipeline {
    return &Pipeline{
        source:     source,
        processors: make([]FrameProcessor, 0),
    }
}

func (p *Pipeline) AddProcessor(processor FrameProcessor) {
    p.processors = append(p.processors, processor)
}

func (p *Pipeline) Run(ctx context.Context) error {
    // Video kaynağını başlat
    if err := p.source.Start(ctx); err != nil {
        return err
    }
    
    // Channel'ları oluştur
    channels := make([]chan *Frame, len(p.processors)+1)
    for i := range channels {
        channels[i] = make(chan *Frame, 10)
    }
    
    // Source'dan ilk channel'a kopyala
    p.wg.Add(1)
    go func() {
        defer p.wg.Done()
        defer close(channels[0])
        for frame := range p.source.Frames() {
            select {
            case channels[0] <- frame:
            case <-ctx.Done():
                return
            }
        }
    }()
    
    // Her processor'ı çalıştır
    for i, proc := range p.processors {
        p.wg.Add(1)
        go func(idx int, processor FrameProcessor) {
            defer p.wg.Done()
            defer func() {
                if r := recover(); r != nil {
                    fmt.Printf("Processor panic: %v\n", r)
                }
                close(channels[idx+1])
            }()
            processor.Process(ctx, channels[idx], channels[idx+1])
        }(i, proc)
    }
    
    // Hata kontrolü
    go func() {
        for err := range p.source.Errors() {
            // Log error (örnek)
            fmt.Printf("Video source error: %v\n", err)
        }
    }()
    
    return nil
}

func (p *Pipeline) Stop() error {
    return p.source.Stop()
}

8.2 Frame Struct İyileştirmesi

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
type Frame struct {
    Data      []byte
    Width     int
    Height    int
    Timestamp time.Time
    Metadata  map[string]interface{} // Analiz sonuçları için
}

func NewFrame(data []byte, width, height int) *Frame {
    return &Frame{
        Data:      data,
        Width:     width,
        Height:    height,
        Timestamp: time.Now(),
        Metadata:  make(map[string]interface{}),
    }
}

9. Gerçek Zamanlı Video Analizi

9.0 ROI (Region of Interest) Detection

Tüm frame’i işlemek yerine, sadece ilgili bölgeleri (ROI) işleyerek CPU kullanımını önemli ölçüde azaltabiliriz:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
import (
    "image"
    "sync"
)

type ROIDetector struct {
    regions []image.Rectangle
    mutex   sync.RWMutex
}

func NewROIDetector() *ROIDetector {
    return &ROIDetector{
        regions: make([]image.Rectangle, 0),
    }
}

// Motion detection sonuçlarına göre ROI belirle
func (rd *ROIDetector) UpdateROI(motionMap []bool, width, height int) {
    rd.mutex.Lock()
    defer rd.mutex.Unlock()
    
    // Motion olan bölgeleri tespit et ve bounding box oluştur
    // Basit implementasyon: motion olan pixel'lerin bounding box'ı
    minX, minY := width, height
    maxX, maxY := 0, 0
    
    for y := 0; y < height; y++ {
        for x := 0; x < width; x++ {
            idx := y*width + x
            if idx < len(motionMap) && motionMap[idx] {
                if x < minX {
                    minX = x
                }
                if x > maxX {
                    maxX = x
                }
                if y < minY {
                    minY = y
                }
                if y > maxY {
                    maxY = y
                }
            }
        }
    }
    
    if maxX > minX && maxY > minY {
        // Padding ekle
        padding := 20
        roi := image.Rect(
            maxInt(0, minX-padding),
            maxInt(0, minY-padding),
            minInt(width, maxX+padding),
            minInt(height, maxY+padding),
        )
        rd.regions = []image.Rectangle{roi}
    }
}

func maxInt(a, b int) int {
    if a > b {
        return a
    }
    return b
}

func minInt(a, b int) int {
    if a < b {
        return a
    }
    return b
}

// Frame'in sadece ROI bölgesini çıkar
func (rd *ROIDetector) ExtractROI(frame *Frame) []*Frame {
    rd.mutex.RLock()
    defer rd.mutex.RUnlock()
    
    roiFrames := make([]*Frame, 0, len(rd.regions))
    
    for _, roi := range rd.regions {
        roiData := extractRegion(frame.Data, frame.Width, frame.Height, roi)
        roiFrame := &Frame{
            Data:      roiData,
            Width:     roi.Dx(),
            Height:    roi.Dy(),
            Timestamp: frame.Timestamp,
            Metadata: map[string]interface{}{
                "roi":        roi,
                "original_width":  frame.Width,
                "original_height": frame.Height,
            },
        }
        roiFrames = append(roiFrames, roiFrame)
    }
    
    return roiFrames
}

func extractRegion(data []byte, width, height int, roi image.Rectangle) []byte {
    roiWidth := roi.Dx()
    roiHeight := roi.Dy()

    // ROI'nin frame sınırları içinde olduğundan emin ol (bounds check)
    if roi.Min.X < 0 || roi.Min.Y < 0 || roi.Max.X > width || roi.Max.Y > height {
        return nil
    }

    roiData := make([]byte, roiWidth*roiHeight*3)

    for y := 0; y < roiHeight; y++ {
        srcY := roi.Min.Y + y
        srcOffset := (srcY*width + roi.Min.X) * 3
        dstOffset := (y * roiWidth) * 3

        // Her satırı tek seferde kopyala (daha hızlı, daha az bounds check)
        copy(roiData[dstOffset:dstOffset+roiWidth*3],
            data[srcOffset:srcOffset+roiWidth*3])
    }

    return roiData
}

ROI Kullanımının Avantajları:

  • %60-80 CPU tasarrufu - Sadece ilgili bölgeleri işleme
  • Daha hızlı object detection - Küçük ROI’ler üzerinde inference
  • Daha az bellek kullanımı - Sadece ROI frame’leri saklama

9.1 Motion Detection

Motion detection, frame’ler arasındaki farkları hesaplayarak hareket tespiti yapar. Edge sistemlerde hafif ve hızlı bir yöntemdir.

Frame Differencing Algoritması

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
package main

import (
    "context"
    "sync"
)

type MotionDetector struct {
    threshold     float64
    previousFrame []byte
    width, height int
    mutex         sync.RWMutex // previousFrame için thread-safety
}

func NewMotionDetector(threshold float64, width, height int) *MotionDetector {
    return &MotionDetector{
        threshold: threshold,
        width:     width,
        height:    height,
    }
}

func (md *MotionDetector) Process(ctx context.Context, in <-chan *Frame, out chan<- *Frame) {
    for frame := range in {
        select {
        case <-ctx.Done():
            return
        default:
            hasMotion := md.detectMotion(frame)
            frame.Metadata["motion_detected"] = hasMotion
            
            if hasMotion {
                frame.Metadata["motion_score"] = md.calculateMotionScore(frame)
            }
            
            select {
            case out <- frame:
            case <-ctx.Done():
                return
            }
        }
    }
}

func (md *MotionDetector) Name() string {
    return "MotionDetector"
}

func (md *MotionDetector) detectMotion(frame *Frame) bool {
    md.mutex.Lock()
    defer md.mutex.Unlock()
    
    if md.previousFrame == nil {
        // İlk frame, sakla ve hareket yok
        md.previousFrame = make([]byte, len(frame.Data))
        copy(md.previousFrame, frame.Data)
        return false
    }
    
    diff := md.calculateDifference(frame.Data, md.previousFrame)
    motionRatio := float64(diff) / float64(len(frame.Data))
    
    // Önceki frame buffer'ını yeniden kullan (gerekiyorsa yeniden allocate et)
    if md.previousFrame == nil || len(md.previousFrame) != len(frame.Data) {
        md.previousFrame = make([]byte, len(frame.Data))
    }
    copy(md.previousFrame, frame.Data)
    
    return motionRatio > md.threshold
}

func (md *MotionDetector) calculateDifference(current, previous []byte) int {
    diff := 0
    for i := 0; i < len(current) && i < len(previous); i++ {
        if current[i] > previous[i] {
            diff += int(current[i] - previous[i])
        } else {
            diff += int(previous[i] - current[i])
        }
    }
    return diff
}

func (md *MotionDetector) calculateMotionScore(frame *Frame) float64 {
    md.mutex.RLock()
    defer md.mutex.RUnlock()
    
    if md.previousFrame == nil {
        return 0.0
    }
    
    diff := md.calculateDifference(frame.Data, md.previousFrame)
    maxDiff := len(frame.Data) * 255
    return float64(diff) / float64(maxDiff)
}

9.2 Object Detection

Object detection için GoCV (OpenCV binding) veya external inference servisleri kullanılabilir.

GoCV ile Basit Object Detection

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
package main

import (
    "context"
    "fmt"
    "image"
    "os"
    "strings"
    "gocv.io/x/gocv"
)

type ObjectDetector struct {
    net    gocv.Net
    classes []string
}

func NewObjectDetector(modelPath, configPath, classesPath string) (*ObjectDetector, error) {
    net := gocv.ReadNet(modelPath, configPath)
    if net.Empty() {
        return nil, fmt.Errorf("failed to load model")
    }
    
    // GPU varsa kullan (opsiyonel)
    net.SetPreferableBackend(gocv.NetBackendOpenVINO)
    net.SetPreferableTarget(gocv.NetTargetCPU) // veya NetTargetVPU
    
    // Classes dosyasını oku
    classes, err := loadClasses(classesPath)
    if err != nil {
        return nil, fmt.Errorf("failed to load classes: %w", err)
    }
    
    return &ObjectDetector{
        net:     net,
        classes: classes,
    }, nil
}

func (od *ObjectDetector) Process(ctx context.Context, in <-chan *Frame, out chan<- *Frame) {
    for frame := range in {
        select {
        case <-ctx.Done():
            return
        default:
            detections := od.detect(frame)
            frame.Metadata["detections"] = detections
            
            select {
            case out <- frame:
            case <-ctx.Done():
                return
            }
        }
    }
}

func (od *ObjectDetector) Name() string {
    return "ObjectDetector"
}

func (od *ObjectDetector) detect(frame *Frame) []Detection {
    // Frame data'yı gocv.Mat'e dönüştür
    img, err := gocv.NewMatFromBytes(
        frame.Height,
        frame.Width,
        gocv.MatTypeCV8UC3,
        frame.Data,
    )
    if err != nil {
        return nil
    }
    defer img.Close()
    
    // Blob oluştur (model input formatı)
    blob := gocv.BlobFromImage(img, 1.0/255.0, image.Pt(416, 416), gocv.NewScalar(0, 0, 0, 0), true, false)
    defer blob.Close()
    
    // Inference
    od.net.SetInput(blob, "")
    prob := od.net.Forward("")
    defer prob.Close()
    
    // Sonuçları parse et
    detections := parseDetections(prob, frame.Width, frame.Height)
    
    return detections
}

type Detection struct {
    Class      string
    Confidence float64
    BBox       image.Rectangle
}

func parseDetections(prob gocv.Mat, imgWidth, imgHeight int) []Detection {
    // Model output'una göre parse et
    // Bu örnek basitleştirilmiş, gerçek implementasyon model'e bağlı
    return []Detection{}
}

func loadClasses(path string) ([]string, error) {
    data, err := os.ReadFile(path)
    if err != nil {
        return nil, fmt.Errorf("failed to read classes file: %w", err)
    }
    
    lines := strings.Split(string(data), "\n")
    classes := make([]string, 0, len(lines))
    for _, line := range lines {
        line = strings.TrimSpace(line)
        if line != "" {
            classes = append(classes, line)
        }
    }
    
    return classes, nil
}

External Inference Service (HTTP/REST)

Model çok büyükse veya özel hardware (NVIDIA Jetson, Intel Neural Compute Stick) varsa, inference’ı ayrı bir servise taşıyabiliriz:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
package main

import (
    "bytes"
    "context"
    "encoding/base64"
    "encoding/json"
    "fmt"
    "net/http"
    "time"
)

type ExternalDetector struct {
    client  *http.Client
    apiURL  string
    timeout time.Duration
}

func (ed *ExternalDetector) Process(ctx context.Context, in <-chan *Frame, out chan<- *Frame) {
    for frame := range in {
        select {
        case <-ctx.Done():
            return
        default:
            detections, err := ed.callInferenceService(frame)
            if err == nil {
                frame.Metadata["detections"] = detections
            } else {
                frame.Metadata["detection_error"] = err.Error()
            }
            
            select {
            case out <- frame:
            case <-ctx.Done():
                return
            }
        }
    }
}

func (ed *ExternalDetector) callInferenceService(frame *Frame) ([]Detection, error) {
    // Frame'i base64 encode et veya binary gönder
    reqBody, err := json.Marshal(map[string]interface{}{
        "image": base64.StdEncoding.EncodeToString(frame.Data),
        "width": frame.Width,
        "height": frame.Height,
    })
    if err != nil {
        return nil, err
    }
    
    ctx, cancel := context.WithTimeout(context.Background(), ed.timeout)
    defer cancel()
    
    req, err := http.NewRequestWithContext(ctx, "POST", ed.apiURL, bytes.NewReader(reqBody))
    if err != nil {
        return nil, err
    }
    req.Header.Set("Content-Type", "application/json")
    
    resp, err := ed.client.Do(req)
    if err != nil {
        return nil, err
    }
    defer resp.Body.Close()
    
    var result struct {
        Detections []Detection `json:"detections"`
    }
    
    if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
        return nil, err
    }
    
    return result.Detections, nil
}

10. Edge Decision Layer

Edge Decision Layer, analiz sonuçlarına göre karar verir ve sadece önemli event’leri cloud’a gönderir. Bu katman:

  • Threshold filtering - Düşük confidence’ları filtreler
  • Event deduplication - Aynı event’i tekrar tekrar göndermez
  • Rate limiting - Çok fazla event göndermeyi engeller
  • Event aggregation - Birden fazla event’i birleştirir

10.1 Event Publisher Implementasyonları

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
package main

import (
    "context"
    "fmt"
    "sync"
    "time"
)

type EdgeDecisionLayer struct {
    minConfidence  float64
    eventPublisher EventPublisher
    lastEvents     map[string]time.Time
    mutex          sync.RWMutex
    cooldown       time.Duration
}

type EventPublisher interface {
    PublishWithContext(ctx context.Context, event *Event) error
}

type Event struct {
    Type       string                 `json:"type"`
    Timestamp  time.Time              `json:"timestamp"`
    Confidence float64                `json:"confidence"`
    Detections []Detection            `json:"detections,omitempty"`
    Metadata   map[string]interface{} `json:"metadata"`
}

func NewEdgeDecisionLayer(minConfidence float64, publisher EventPublisher, cooldown time.Duration) *EdgeDecisionLayer {
    return &EdgeDecisionLayer{
        minConfidence:  minConfidence,
        eventPublisher: publisher,
        lastEvents:     make(map[string]time.Time),
        cooldown:       cooldown,
    }
}

func (edl *EdgeDecisionLayer) Process(ctx context.Context, in <-chan *Frame, out chan<- *Frame) {
    for frame := range in {
        select {
        case <-ctx.Done():
            return
        default:
            edl.processFrame(frame)
            
            // Frame'i pipeline'a devam ettir
            select {
            case out <- frame:
            case <-ctx.Done():
                return
            }
        }
    }
}

func (edl *EdgeDecisionLayer) Name() string {
    return "EdgeDecisionLayer"
}

func (edl *EdgeDecisionLayer) processFrame(frame *Frame) {
    // Detections varsa kontrol et
    detections, ok := frame.Metadata["detections"].([]Detection)
    if !ok {
        return
    }
    
    // Yüksek confidence'lı detections'ları filtrele
    significantDetections := edl.filterByConfidence(detections)
    if len(significantDetections) == 0 {
        return
    }
    
    // Event tipini belirle
    eventType := edl.determineEventType(significantDetections)
    
    // Cooldown kontrolü (aynı event'i çok sık gönderme)
    if !edl.shouldSendEvent(eventType) {
        return
    }
    
    // Event oluştur
    event := &Event{
        Type:       eventType,
        Timestamp:  frame.Timestamp,
        Confidence: edl.calculateMaxConfidence(significantDetections),
        Detections: significantDetections,
        Metadata: map[string]interface{}{
            "frame_width":  frame.Width,
            "frame_height": frame.Height,
        },
    }
    
    // Pipeline'ı bloklamamak için event'i async ve timeout'lu publish et
    go func(evt *Event, evtType string) {
        ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
        defer cancel()

        if err := edl.eventPublisher.PublishWithContext(ctx, evt); err != nil {
            fmt.Printf("Failed to publish event: %v\n", err)
            return
        }

        // Başarılı publish sonrası cooldown güncelle
        edl.mutex.Lock()
        edl.lastEvents[evtType] = time.Now()
        edl.mutex.Unlock()
    }(event, eventType)
}

func (edl *EdgeDecisionLayer) filterByConfidence(detections []Detection) []Detection {
    filtered := make([]Detection, 0)
    for _, det := range detections {
        if det.Confidence >= edl.minConfidence {
            filtered = append(filtered, det)
        }
    }
    return filtered
}

func (edl *EdgeDecisionLayer) determineEventType(detections []Detection) string {
    // Basit logic: ilk detection'ın class'ını kullan
    // Gerçek uygulamada daha karmaşık logic olabilir
    if len(detections) > 0 {
        return detections[0].Class + "_detected"
    }
    return "unknown"
}

func (edl *EdgeDecisionLayer) shouldSendEvent(eventType string) bool {
    edl.mutex.RLock()
    lastTime, exists := edl.lastEvents[eventType]
    edl.mutex.RUnlock()
    
    if !exists {
        return true
    }
    
    return time.Since(lastTime) > edl.cooldown
}

func (edl *EdgeDecisionLayer) calculateMaxConfidence(detections []Detection) float64 {
    max := 0.0
    for _, det := range detections {
        if det.Confidence > max {
            max = det.Confidence
        }
    }
    return max
}

MQTT Publisher

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package main

import (
    "encoding/json"
    mqtt "github.com/eclipse/paho.mqtt.golang"
)

type MQTTPublisher struct {
    client mqtt.Client
    topic  string
}

func NewMQTTPublisher(brokerURL, topic, clientID string) (*MQTTPublisher, error) {
    opts := mqtt.NewClientOptions()
    opts.AddBroker(brokerURL)
    opts.SetClientID(clientID)
    
    client := mqtt.NewClient(opts)
    if token := client.Connect(); token.Wait() && token.Error() != nil {
        return nil, token.Error()
    }
    
    return &MQTTPublisher{
        client: client,
        topic:  topic,
    }, nil
}

func (mp *MQTTPublisher) PublishWithContext(ctx context.Context, event *Event) error {
    data, err := json.Marshal(event)
    if err != nil {
        return err
    }
    
    token := mp.client.Publish(mp.topic, 1, false, data)

    done := make(chan struct{})
    go func() {
        token.Wait()
        close(done)
    }()

    select {
    case <-ctx.Done():
        return ctx.Err()
    case <-done:
        return token.Error()
    }
}

func (mp *MQTTPublisher) Close() error {
    if mp.client != nil && mp.client.IsConnected() {
        // Bekleyen operasyonlar için 250ms timeout ile disconnect
        mp.client.Disconnect(250)
    }
    return nil
}

HTTP Webhook Publisher

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package main

import (
    "bytes"
    "encoding/json"
    "fmt"
    "net/http"
)

type WebhookPublisher struct {
    client  *http.Client
    webhookURL string
}

func (wp *WebhookPublisher) PublishWithContext(ctx context.Context, event *Event) error {
    data, err := json.Marshal(event)
    if err != nil {
        return err
    }
    
    req, err := http.NewRequestWithContext(ctx, "POST", wp.webhookURL, bytes.NewReader(data))
    if err != nil {
        return err
    }
    req.Header.Set("Content-Type", "application/json")
    
    resp, err := wp.client.Do(req)
    if err != nil {
        return err
    }
    defer resp.Body.Close()
    
    if resp.StatusCode >= 400 {
        return fmt.Errorf("webhook returned status %d", resp.StatusCode)
    }
    
    return nil
}

11. Multi-Camera Desteği

Production ortamında genellikle birden fazla kamera yönetmek gerekir. Her kamera için ayrı pipeline oluşturarak paralel işleme yapabiliriz:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
import (
    "context"
    "fmt"
    "log"
    "sync"
)

type MultiCameraManager struct {
    cameras   map[string]*CameraPipeline
    mutex     sync.RWMutex
    ctx       context.Context
    cancel    context.CancelFunc
}

type CameraPipeline struct {
    ID       string
    Source   *VideoSource
    Pipeline *Pipeline
    Config   CameraConfig
}

type CameraConfig struct {
    URL           string
    Width         int
    Height        int
    MotionEnabled bool
    ObjectEnabled bool
}

func NewMultiCameraManager() *MultiCameraManager {
    ctx, cancel := context.WithCancel(context.Background())
    return &MultiCameraManager{
        cameras: make(map[string]*CameraPipeline),
        ctx:     ctx,
        cancel:  cancel,
    }
}

func (mcm *MultiCameraManager) AddCamera(id string, config CameraConfig) error {
    mcm.mutex.Lock()
    defer mcm.mutex.Unlock()
    
    source := NewVideoSource(config.URL, config.Width, config.Height)
    pipeline := NewPipeline(source)
    
    // Motion detector ekle
    if config.MotionEnabled {
        motionDetector := NewMotionDetector(0.05, config.Width, config.Height)
        pipeline.AddProcessor(motionDetector)
    }
    
    // Object detector ekle
    if config.ObjectEnabled {
        objectDetector, err := NewObjectDetector("model.onnx", "config.yaml", "classes.txt")
        if err == nil {
            pipeline.AddProcessor(objectDetector)
        }
    }
    
    cameraPipeline := &CameraPipeline{
        ID:       id,
        Source:   source,
        Pipeline: pipeline,
        Config:   config,
    }
    
    mcm.cameras[id] = cameraPipeline
    
    // Pipeline'ı başlat
    go func() {
        if err := pipeline.Run(mcm.ctx); err != nil {
            log.Printf("Camera %s pipeline hatası: %v", id, err)
        }
    }()
    
    return nil
}

func (mcm *MultiCameraManager) RemoveCamera(id string) error {
    mcm.mutex.Lock()
    defer mcm.mutex.Unlock()
    
    if camera, exists := mcm.cameras[id]; exists {
        camera.Pipeline.Stop()
        delete(mcm.cameras, id)
    }
    
    return nil
}

func (mcm *MultiCameraManager) GetCameraStatus(id string) (map[string]interface{}, error) {
    mcm.mutex.RLock()
    defer mcm.mutex.RUnlock()
    
    camera, exists := mcm.cameras[id]
    if !exists {
        return nil, fmt.Errorf("camera %s not found", id)
    }
    
    return map[string]interface{}{
        "id":       camera.ID,
        "url":      camera.Config.URL,
        "status":   "running",
        "width":    camera.Config.Width,
        "height":   camera.Config.Height,
    }, nil
}

12. Performans ve Optimizasyon

12.1 Memory Pooling

Video işleme, çok fazla bellek allocation’ı gerektirir. sync.Pool kullanarak bellek allocation overhead’ini azaltabiliriz:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
var framePool = sync.Pool{
    New: func() interface{} {
        // Frame buffer'ı için pool
        return make([]byte, 640*480*3) // RGB24, 640x480
    },
}

// Frame okurken pool'dan al (doğru kullanım örneği)
func (vs *VideoSource) readFrames(ctx context.Context) {
    frameSize := vs.width * vs.height * 3

    for {
        frameBuffer := framePool.Get().([]byte)

        n, err := io.ReadFull(vs.stdout, frameBuffer)
        if err != nil {
            // Hata durumunda buffer'ı hemen pool'a geri ver
            framePool.Put(frameBuffer)

            if err == io.EOF || err == io.ErrUnexpectedEOF {
                return
            }

            select {
            case vs.errors <- fmt.Errorf("read frame: %w", err):
            case <-ctx.Done():
            }
            return
        }

        if n != frameSize {
            framePool.Put(frameBuffer)
            continue
        }

        // Frame data'yı pool'dan bağımsız olacak şekilde kopyala
        frameData := make([]byte, frameSize)
        copy(frameData, frameBuffer)

        // Buffer'ı pool'a geri ver
        framePool.Put(frameBuffer)

        frame := &Frame{
            Data:      frameData,
            Width:     vs.width,
            Height:    vs.height,
            Timestamp: time.Now(),
            Metadata:  make(map[string]interface{}),
        }

        select {
        case vs.frames <- frame:
        case <-ctx.Done():
            return
        }
    }
}

12.2 Frame Rate Kontrolü

Bazı durumlarda tüm frame’leri işlemek gerekmez. Frame skipping ile CPU kullanımını azaltabiliriz:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
type FrameRateController struct {
    targetFPS    int
    frameCounter int
    skipRatio    int
}

func NewFrameRateController(targetFPS int) *FrameRateController {
    // Örneğin: 30 FPS'den 10 FPS'e düşürmek için her 3 frame'den 1'ini al
    skipRatio := 30 / targetFPS
    return &FrameRateController{
        targetFPS: targetFPS,
        skipRatio: skipRatio,
    }
}

func (frc *FrameRateController) ShouldProcess() bool {
    frc.frameCounter++
    return frc.frameCounter%frc.skipRatio == 0
}

12.3 Context ve Cancellation

Uzun süren işlemlerde context cancellation kullanarak kaynakları doğru şekilde temizleyelim:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import (
    "os"
    "os/signal"
    "syscall"
)

func (p *Pipeline) Run(ctx context.Context) error {
    ctx, cancel := context.WithCancel(ctx)
    defer cancel()
    
    // Graceful shutdown için signal handling
    sigChan := make(chan os.Signal, 1)
    signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
    
    go func() {
        <-sigChan
        cancel()
    }()
    
    // Pipeline'ı çalıştır
    // ...
    
    return nil
}

12.4 Benchmarking

Performans metriklerini toplamak için:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
type PerformanceMetrics struct {
    FramesProcessed int64
    ProcessingTime  time.Duration
    FrameRate       float64
    startTime       time.Time
    mutex           sync.RWMutex
}

func NewPerformanceMetrics() *PerformanceMetrics {
    return &PerformanceMetrics{
        startTime: time.Now(),
    }
}

func (pm *PerformanceMetrics) RecordFrame(processingTime time.Duration) {
    pm.mutex.Lock()
    defer pm.mutex.Unlock()
    pm.FramesProcessed++
    pm.ProcessingTime += processingTime
    elapsed := time.Since(pm.startTime).Seconds()
    if elapsed > 0 {
        pm.FrameRate = float64(pm.FramesProcessed) / elapsed
    }
}

12.5 Performans Metrikleri (Gerçek Test Sonuçları)

Test ortamında elde ettiğimiz bazı gerçek performans değerleri:

  • Hardware: Raspberry Pi 4 (4GB RAM), 640x480 çözünürlük
  • Motion Detection: ~25-30 FPS (CPU: %15-20)
  • Object Detection (CPU): ~2-3 FPS (CPU: %80-90)
  • Object Detection (GPU/OpenVINO): ~8-10 FPS (CPU: %40-50)
  • Memory Kullanımı: ~150-200 MB (memory pool ile)
  • Network Latency (MQTT): ~10-50ms (local broker)

Önemli Not: Object detection için CPU kullanımı çok yüksek. Production’da GPU veya dedicated AI accelerator (Jetson Nano, Neural Compute Stick) kullanılması önerilir.

12.6 Error Handling ve Resilience

Retry Mekanizması

Video kaynağı bağlantıları kesilebilir. Retry mekanizması ekleyelim:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
func (vs *VideoSource) StartWithRetry(ctx context.Context, maxRetries int) error {
    var lastErr error
    for i := 0; i < maxRetries; i++ {
        if err := vs.Start(ctx); err == nil {
            return nil
        }
        lastErr = err
        
        select {
        case <-ctx.Done():
            return ctx.Err()
        case <-time.After(time.Second * time.Duration(i+1)):
            // Exponential backoff
        }
    }
    return fmt.Errorf("failed after %d retries: %w", maxRetries, lastErr)
}

Graceful Degradation

Object detection servisi çalışmıyorsa, sadece motion detection ile devam edebiliriz:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
func (od *ObjectDetector) Process(ctx context.Context, in <-chan *Frame, out chan<- *Frame) {
    for frame := range in {
        select {
        case <-ctx.Done():
            return
        default:
            detections, err := od.detect(frame)
            if err != nil {
                // Error log'la ama pipeline'ı durdurma
                frame.Metadata["detection_error"] = err.Error()
            } else {
                frame.Metadata["detections"] = detections
            }
            
            select {
            case out <- frame:
            case <-ctx.Done():
                return
            }
        }
    }
}

12.7 Monitoring ve Logging

Structured Logging

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
import "github.com/sirupsen/logrus"

var logger = logrus.New()

func init() {
    logger.SetFormatter(&logrus.JSONFormatter{})
    logger.SetLevel(logrus.InfoLevel)
}

// Kullanım
logger.WithFields(logrus.Fields{
    "frame_id":    frameID,
    "fps":         fps,
    "detections":  len(detections),
    "processing_time_ms": processingTime.Milliseconds(),
}).Info("Frame processed")

Metrics Export (Prometheus)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import "github.com/prometheus/client_golang/prometheus"

var (
    framesProcessed = prometheus.NewCounterVec(
        prometheus.CounterOpts{
            Name: "video_frames_processed_total",
            Help: "Total number of frames processed",
        },
        []string{"source"},
    )
    
    processingDuration = prometheus.NewHistogramVec(
        prometheus.HistogramOpts{
            Name:    "video_frame_processing_seconds",
            Help:    "Frame processing duration",
            Buckets: prometheus.DefBuckets,
        },
        []string{"stage"},
    )
)

func init() {
    prometheus.MustRegister(framesProcessed)
    prometheus.MustRegister(processingDuration)
}

Metrics HTTP Endpoint

Prometheus metrics’lerini expose etmek için HTTP endpoint ekleyelim:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
import (
    "net/http"
    "github.com/prometheus/client_golang/prometheus/promhttp"
)

func startMetricsServer(port string) error {
    http.Handle("/metrics", promhttp.Handler())
    return http.ListenAndServe(":"+port, nil)
}

// main.go içinde
go func() {
    if err := startMetricsServer("2112"); err != nil {
        log.Printf("Metrics server error: %v", err)
    }
}()

12.8 Profiling ve Performans Analizi

Go’nun built-in profiling araçları ile performans analizi yapabiliriz.

pprof ile CPU Profiling

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
import (
    _ "net/http/pprof"
    "net/http"
)

func main() {
    // Profiling endpoint'lerini ekle
    go func() {
        log.Println(http.ListenAndServe("localhost:6060", nil))
    }()
    
    // ... uygulama kodunuz
}

// Profil toplamak için:
// go tool pprof http://localhost:6060/debug/pprof/profile?seconds=30
// go tool pprof http://localhost:6060/debug/pprof/heap

Memory Profiling

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import (
    "runtime"
    "runtime/pprof"
    "os"
)

func captureMemoryProfile(filename string) error {
    f, err := os.Create(filename)
    if err != nil {
        return err
    }
    defer f.Close()
    
    runtime.GC() // GC'yi zorla çalıştır
    return pprof.WriteHeapProfile(f)
}

// Kullanım: Periyodik olarak memory profile al
go func() {
    ticker := time.NewTicker(5 * time.Minute)
    defer ticker.Stop()
    for range ticker.C {
        captureMemoryProfile("memprofile.prof")
    }
}()

Goroutine Profiling

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
import (
    "runtime/pprof"
    "os"
)

func captureGoroutineProfile(filename string) error {
    f, err := os.Create(filename)
    if err != nil {
        return err
    }
    defer f.Close()
    
    return pprof.Lookup("goroutine").WriteTo(f, 0)
}

13. Testing

Video işleme sistemlerinde test yazmak için mock’lar ve test utilities kullanmalıyız. Production sistemler için kapsamlı test stratejisi kritik öneme sahiptir.

13.1 Unit Test Örneği

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package main

import (
    "testing"
    "time"
)

func TestMotionDetector(t *testing.T) {
    detector := NewMotionDetector(0.05, 640, 480)
    
    // İlk frame - motion olmamalı
    frame1 := &Frame{
        Data:      make([]byte, 640*480*3),
        Width:     640,
        Height:    480,
        Timestamp: time.Now(),
        Metadata:  make(map[string]interface{}),
    }
    
    hasMotion := detector.detectMotion(frame1)
    if hasMotion {
        t.Error("İlk frame'de motion olmamalı")
    }
    
    // İkinci frame - değişiklik yap
    frame2 := &Frame{
        Data:      make([]byte, 640*480*3),
        Width:     640,
        Height:    480,
        Timestamp: time.Now(),
        Metadata:  make(map[string]interface{}),
    }
    
    // Frame2'de değişiklik yap (pixel'leri değiştir)
    for i := 0; i < len(frame2.Data); i += 100 {
        frame2.Data[i] = 255
    }
    
    hasMotion = detector.detectMotion(frame2)
    if !hasMotion {
        t.Error("Değişiklik olan frame'de motion olmalı")
    }
}

13.2 Integration Test

Gerçek bileşenlerin birlikte çalışmasını test eden integration testler:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
func TestVideoPipelineIntegration(t *testing.T) {
    ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
    defer cancel()
    
    // Gerçek FFmpeg process ile test (test video dosyası kullan)
    source := NewVideoSource("test_video.mp4", 640, 480)
    
    pipeline := NewPipeline(source)
    motionDetector := NewMotionDetector(0.05, 640, 480)
    objectDetector := NewMockObjectDetector()
    edgeDecision := NewEdgeDecisionLayer(0.7, 5*time.Second)
    
    pipeline.AddProcessor(motionDetector)
    pipeline.AddProcessor(objectDetector)
    pipeline.AddProcessor(edgeDecision)
    
    // Event'leri topla
    events := make([]Event, 0)
    go func() {
        for event := range edgeDecision.Events() {
            events = append(events, event)
        }
    }()
    
    // Pipeline'ı çalıştır
    if err := pipeline.Run(ctx); err != nil {
        t.Fatalf("Pipeline başlatılamadı: %v", err)
    }
    
    // 10 saniye bekle
    time.Sleep(10 * time.Second)
    pipeline.Stop()
    
    // En az bir event bekleniyor
    if len(events) == 0 {
        t.Error("Hiç event üretilmedi")
    }
    
    // Event'lerin doğru formatta olduğunu kontrol et
    for _, event := range events {
        if event.Timestamp.IsZero() {
            t.Error("Event timestamp boş")
        }
        if event.Type == "" {
            t.Error("Event type boş")
        }
    }
}

13.3 End-to-End (E2E) Test

Tüm sistemin birlikte çalışmasını test eden E2E testler:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
func TestE2EVideoProcessing(t *testing.T) {
    // Test ortamını başlat
    testEnv := setupTestEnvironment(t)
    defer testEnv.Cleanup()
    
    // MQTT broker başlat (test için)
    mqttBroker := startTestMQTTBroker(t)
    defer mqttBroker.Stop()
    
    // Video processing servisini başlat
    config := &Config{
        VideoSource: "rtsp://test-camera:554/stream",
        MQTTBroker:  "localhost:1883",
    }
    
    service := NewVideoProcessingService(config)
    if err := service.Start(); err != nil {
        t.Fatalf("Service başlatılamadı: %v", err)
    }
    defer service.Stop()
    
    // Test video stream'i gönder
    go sendTestVideoStream(t, "rtsp://test-camera:554/stream")
    
    // MQTT'dan event'leri dinle
    events := subscribeToMQTT(t, "video/events/+")
    
    // 30 saniye içinde event bekleniyor
    timeout := time.After(30 * time.Second)
    select {
    case event := <-events:
        // Event'i validate et
        if err := validateEvent(event); err != nil {
            t.Errorf("Geçersiz event: %v", err)
        }
        t.Logf("Event alındı: %+v", event)
    case <-timeout:
        t.Error("Event timeout - hiç event alınamadı")
    }
}

13.4 Load Testing

Sistemin yük altındaki performansını test eden load testler:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
func TestLoadMultipleCameras(t *testing.T) {
    if testing.Short() {
        t.Skip("Skipping load test in short mode")
    }
    
    numCameras := 10
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
    defer cancel()
    
    // Multi-camera manager oluştur
    manager := NewMultiCameraManager()
    
    // 10 kamera ekle
    for i := 0; i < numCameras; i++ {
        cameraID := fmt.Sprintf("camera-%d", i)
        sourceURL := fmt.Sprintf("rtsp://test-camera-%d:554/stream", i)
        
        if err := manager.AddCamera(cameraID, sourceURL, 640, 480); err != nil {
            t.Fatalf("Kamera eklenemedi: %v", err)
        }
    }
    
    // Tüm kameraları başlat
    if err := manager.StartAll(ctx); err != nil {
        t.Fatalf("Kameralar başlatılamadı: %v", err)
    }
    
    // 2 dakika çalıştır
    time.Sleep(2 * time.Minute)
    
    // Metrikleri topla
    metrics := manager.GetMetrics()
    
    // Performans kontrolü
    if metrics.AverageFPS < 20 {
        t.Errorf("Düşük FPS: %f (beklenen: >= 20)", metrics.AverageFPS)
    }
    
    if metrics.CPUUsage > 80 {
        t.Errorf("Yüksek CPU kullanımı: %f%% (beklenen: < 80%%)", metrics.CPUUsage)
    }
    
    if metrics.MemoryUsage > 500*1024*1024 { // 500MB
        t.Errorf("Yüksek bellek kullanımı: %d bytes", metrics.MemoryUsage)
    }
    
    manager.StopAll()
}

13.5 Stress Testing

Sistemin limitlerini test eden stress testler:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
func TestStressHighFrameRate(t *testing.T) {
    if testing.Short() {
        t.Skip("Skipping stress test in short mode")
    }
    
    // Çok yüksek frame rate ile test
    highFPSSource := NewMockVideoSourceWithFPS(60) // 60 FPS
    
    pipeline := NewPipeline(highFPSSource)
    // Tüm processor'ları ekle
    
    ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
    defer cancel()
    
    // Frame drop sayısını izle
    frameDrops := atomic.Int64{}
    
    go func() {
        for frame := range pipeline.Frames() {
            if frame.Metadata["dropped"] == true {
                frameDrops.Add(1)
            }
        }
    }()
    
    if err := pipeline.Run(ctx); err != nil {
        t.Fatalf("Pipeline başlatılamadı: %v", err)
    }
    
    time.Sleep(30 * time.Second)
    pipeline.Stop()
    
    dropRate := float64(frameDrops.Load()) / 1800.0 // 30 saniye * 60 FPS
    if dropRate > 0.1 { // %10'dan fazla drop
        t.Logf("Yüksek frame drop oranı: %.2f%%", dropRate*100)
        // Bu durumda backpressure mekanizması çalışıyor demektir
    }
}

13.6 Pipeline Integration Test

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
func TestPipeline(t *testing.T) {
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()
    
    // Mock video source oluştur
    source := &MockVideoSource{
        frames: make(chan *Frame, 10),
    }
    
    // Test frame'leri ekle
    for i := 0; i < 5; i++ {
        source.frames <- &Frame{
            Data:      make([]byte, 640*480*3),
            Width:     640,
            Height:    480,
            Timestamp: time.Now(),
            Metadata:  make(map[string]interface{}),
        }
    }
    close(source.frames)
    
    // Pipeline oluştur
    pipeline := NewPipeline(source)
    motionDetector := NewMotionDetector(0.05, 640, 480)
    pipeline.AddProcessor(motionDetector)
    
    // Pipeline'ı çalıştır
    if err := pipeline.Run(ctx); err != nil {
        t.Fatalf("Pipeline başlatılamadı: %v", err)
    }
    
    // Sonuçları kontrol et
    time.Sleep(100 * time.Millisecond)
    pipeline.Stop()
}

13.7 Mock Video Source

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
type MockVideoSource struct {
    frames chan *Frame
    errors chan error
}

func (m *MockVideoSource) Start(ctx context.Context) error {
    return nil
}

func (m *MockVideoSource) Frames() <-chan *Frame {
    return m.frames
}

func (m *MockVideoSource) Errors() <-chan error {
    return m.errors
}

func (m *MockVideoSource) Stop() error {
    close(m.frames)
    close(m.errors)
    return nil
}

13.8 Benchmark Test

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
func BenchmarkMotionDetection(b *testing.B) {
    detector := NewMotionDetector(0.05, 640, 480)
    frame := &Frame{
        Data:      make([]byte, 640*480*3),
        Width:     640,
        Height:    480,
        Timestamp: time.Now(),
        Metadata:  make(map[string]interface{}),
    }
    
    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        detector.detectMotion(frame)
        // Frame'i değiştir
        frame.Data[i%len(frame.Data)]++
    }
}

// Çalıştırmak için: go test -bench=BenchmarkMotionDetection -benchmem

13.9 Rate Limiting Test

Rate limiting mekanizmasının doğru çalıştığını test etmek için:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
import (
    "sync"
    "testing"
    "time"
    
    "golang.org/x/time/rate"
)

func TestRateLimiting(t *testing.T) {
    publisher := &MockPublisher{
        published: make([]*Event, 0),
    }
    
    // 10 event/saniye limit ile rate limiter oluştur
    limiter := rate.NewLimiter(10, 1) // 10 per second, burst 1
    rateLimited := &RateLimitedPublisher{
        publisher: publisher,
        limiter:   limiter,
    }
    
    // 20 event gönder
    start := time.Now()
    for i := 0; i < 20; i++ {
        event := &Event{
            Type:      "test",
            Timestamp: time.Now(),
            Data:      map[string]interface{}{"id": i},
        }
        if err := rateLimited.Publish(event); err != nil {
            t.Errorf("Publish error: %v", err)
        }
    }
    elapsed := time.Since(start)
    
    // 20 event için en az 1 saniye gerekli (10 event/sec limit)
    // Biraz tolerans ver (0.9 saniye)
    if elapsed < 900*time.Millisecond {
        t.Errorf("Rate limiting çalışmıyor: %v (beklenen: >= 1s)", elapsed)
    }
    
    // Tüm event'lerin yayınlandığını kontrol et
    if len(publisher.published) != 20 {
        t.Errorf("Beklenen 20 event, alınan: %d", len(publisher.published))
    }
}

type MockPublisher struct {
    published []*Event
    mutex     sync.Mutex
}

func (mp *MockPublisher) Publish(event *Event) error {
    mp.mutex.Lock()
    defer mp.mutex.Unlock()
    mp.published = append(mp.published, event)
    return nil
}

14. Configuration Management

Yapılandırma yönetimi için YAML veya TOML kullanabiliriz.

YAML Configuration

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import (
    "os"
    "gopkg.in/yaml.v3"
)

type Config struct {
    VideoSource struct {
        URL    string `yaml:"url"`
        Width  int    `yaml:"width"`
        Height int    `yaml:"height"`
    } `yaml:"video_source"`
    
    Processing struct {
        MotionThreshold float64 `yaml:"motion_threshold"`
        MinConfidence   float64 `yaml:"min_confidence"`
        TargetFPS       int     `yaml:"target_fps"`
    } `yaml:"processing"`
    
    MQTT struct {
        BrokerURL string `yaml:"broker_url"`
        Topic     string `yaml:"topic"`
        ClientID  string `yaml:"client_id"`
    } `yaml:"mqtt"`
    
    Server struct {
        MetricsPort string `yaml:"metrics_port"`
        HealthPort  string `yaml:"health_port"`
    } `yaml:"server"`
}

func LoadConfig(path string) (*Config, error) {
    data, err := os.ReadFile(path)
    if err != nil {
        return nil, err
    }
    
    var config Config
    if err := yaml.Unmarshal(data, &config); err != nil {
        return nil, err
    }
    
    return &config, nil
}

config.yaml Örneği

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
video_source:
  url: "rtsp://camera:554/stream"
  width: 640
  height: 480

processing:
  motion_threshold: 0.05
  min_confidence: 0.7
  target_fps: 10

mqtt:
  broker_url: "tcp://mqtt:1883"
  topic: "video/events"
  client_id: "video-processor"

server:
  metrics_port: "2112"
  health_port: "8080"

15. Health Checks ve Observability

Production ortamlarında health check endpoint’leri kritik öneme sahiptir.

Health Check Endpoint

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
import (
    "encoding/json"
    "net/http"
    "sync/atomic"
)

type HealthStatus struct {
    Status    string            `json:"status"`
    Version   string            `json:"version"`
    Uptime    string            `json:"uptime"`
    Frames    int64             `json:"frames_processed"`
    Errors    int64             `json:"errors"`
    Details   map[string]string `json:"details,omitempty"`
}

type HealthChecker struct {
    startTime time.Time
    frames    int64
    errors    int64
    version   string
}

func NewHealthChecker(version string) *HealthChecker {
    return &HealthChecker{
        startTime: time.Now(),
        version:   version,
    }
}

func (hc *HealthChecker) RecordFrame() {
    atomic.AddInt64(&hc.frames, 1)
}

func (hc *HealthChecker) RecordError() {
    atomic.AddInt64(&hc.errors, 1)
}

func (hc *HealthChecker) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    status := "healthy"
    
    // Son 1 dakikada frame işlenmediyse unhealthy
    frames := atomic.LoadInt64(&hc.frames)
    if frames == 0 {
        status = "unhealthy"
    }
    
    uptime := time.Since(hc.startTime).String()
    
    health := HealthStatus{
        Status:  status,
        Version: hc.version,
        Uptime:  uptime,
        Frames:  frames,
        Errors:  atomic.LoadInt64(&hc.errors),
    }
    
    w.Header().Set("Content-Type", "application/json")
    if status == "healthy" {
        w.WriteHeader(http.StatusOK)
    } else {
        w.WriteHeader(http.StatusServiceUnavailable)
    }
    
    json.NewEncoder(w).Encode(health)
}

// Kullanım
healthChecker := NewHealthChecker("1.0.0")
http.Handle("/health", healthChecker)
go http.ListenAndServe(":8080", nil)

Readiness ve Liveness Probes (Kubernetes)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
livenessProbe:
  httpGet:
    path: /health
    port: 8080
  initialDelaySeconds: 30
  periodSeconds: 10
  timeoutSeconds: 5
  failureThreshold: 3

readinessProbe:
  httpGet:
    path: /health
    port: 8080
  initialDelaySeconds: 5
  periodSeconds: 5
  timeoutSeconds: 3
  failureThreshold: 2

16. Backpressure Handling

Video pipeline’larında frame rate yüksek olduğunda channel buffer’lar dolabilir. Backpressure mekanizması ekleyelim.

Adaptive Frame Dropping

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import (
    "math"
    "math/rand"
    "sync"
    "sync/atomic"
)

type FrameDropper struct {
    dropRatio    float64
    bufferFull   bool
    droppedCount int64
    mutex        sync.RWMutex
}

func NewFrameDropper() *FrameDropper {
    return &FrameDropper{
        dropRatio: 0.0,
    }
}

func (fd *FrameDropper) ShouldDrop(bufferUsage float64) bool {
    fd.mutex.Lock()
    defer fd.mutex.Unlock()
    
    // Buffer %80'den fazla dolmuşsa frame drop başlat
    if bufferUsage > 0.8 {
        fd.bufferFull = true
        fd.dropRatio = math.Min(0.5, (bufferUsage-0.8)*2.5) // Max %50 drop
        return true
    }
    
    if fd.bufferFull && bufferUsage > 0.5 {
        // Buffer hala yüksek, drop devam et
        return rand.Float64() < fd.dropRatio
    }
    
    // Buffer normale döndü
    fd.bufferFull = false
    fd.dropRatio = 0.0
    return false
}

func (fd *FrameDropper) RecordDrop() {
    atomic.AddInt64(&fd.droppedCount, 1)
}

Channel Buffer Monitoring

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func monitorChannelBuffer(ch <-chan *Frame, bufferSize int) float64 {
    currentLen := len(ch)
    return float64(currentLen) / float64(bufferSize)
}

// Pipeline içinde kullanım
select {
case out <- frame:
    // Frame gönderildi
case <-time.After(10 * time.Millisecond):
    // Timeout - buffer dolu olabilir
    bufferUsage := monitorChannelBuffer(out, cap(out))
    if frameDropper.ShouldDrop(bufferUsage) {
        frameDropper.RecordDrop()
        // Frame'i atla
        continue
    }
    // Retry
    select {
    case out <- frame:
    case <-ctx.Done():
        return
    }
}

17. Network Optimizasyonu ve Video Streaming Protokolleri

Edge processing sistemlerinde network optimizasyonu kritik öneme sahiptir. Video akışları yüksek bant genişliği gerektirir ve network gecikmeleri sistem performansını doğrudan etkiler.

17.1 Video Streaming Protokolleri

RTSP (Real-Time Streaming Protocol)

RTSP, IP kameralar için en yaygın protokoldür:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
// RTSP URL formatı
rtspURL := "rtsp://username:password@192.168.1.100:554/stream1"

// FFmpeg ile RTSP bağlantısı
args := []string{
    "-rtsp_transport", "tcp",  // TCP kullan (UDP yerine)
    "-i", rtspURL,
    "-vf", "scale=640:480",
    "-r", "15",  // Frame rate limit
    "-pix_fmt", "rgb24",
    "-f", "rawvideo",
    "pipe:1",
}

RTSP Optimizasyonları:

  • TCP transport kullan (UDP packet loss’u önler)
  • Timeout ayarları yap (bağlantı kopmalarında hızlı recovery)
  • Reconnection logic ekle (otomatik yeniden bağlanma)

WebRTC (Web Real-Time Communication)

WebRTC, browser tabanlı real-time streaming için idealdir:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import (
    "bytes"
    "image"
    "image/color"
    "image/jpeg"
    "sync"
    "github.com/gorilla/websocket"
)

// Frame'i JPEG'e çevir (helper function)
func frameToJPEG(frame *Frame, quality int) ([]byte, error) {
    img := image.NewRGBA(image.Rect(0, 0, frame.Width, frame.Height))
    
    for y := 0; y < frame.Height; y++ {
        for x := 0; x < frame.Width; x++ {
            idx := (y*frame.Width + x) * 3
            if idx+2 >= len(frame.Data) {
                continue
            }
            img.Set(x, y, color.RGBA{
                R: frame.Data[idx],
                G: frame.Data[idx+1],
                B: frame.Data[idx+2],
                A: 255,
            })
        }
    }
    
    var buf bytes.Buffer
    if err := jpeg.Encode(&buf, img, &jpeg.Options{Quality: quality}); err != nil {
        return nil, err
    }
    return buf.Bytes(), nil
}

// WebRTC için frame'leri WebSocket üzerinden gönderme
type WebRTCStreamer struct {
    clients map[string]*websocket.Conn
    mutex   sync.RWMutex
}

func (ws *WebRTCStreamer) BroadcastFrame(frame *Frame) {
    ws.mutex.RLock()
    defer ws.mutex.RUnlock()
    
    // Frame'i JPEG'e çevir
    jpegData, err := frameToJPEG(frame, 80)
    if err != nil {
        return
    }
    
    // Tüm client'lara gönder
    for _, conn := range ws.clients {
        if err := conn.WriteMessage(websocket.BinaryMessage, jpegData); err != nil {
            // Hatalı bağlantıyı kaldır
            delete(ws.clients, conn.RemoteAddr().String())
        }
    }
}

17.2 Bandwidth Optimizasyonu

Adaptive Bitrate Streaming

Network koşullarına göre kaliteyi dinamik olarak ayarlama:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import (
    "fmt"
    "sync"
)

type AdaptiveBitrate struct {
    currentQuality string
    bandwidth      float64  // Mbps
    frameRate      int
    resolution     string
    mutex          sync.RWMutex
}

func (ab *AdaptiveBitrate) AdjustQuality(measuredBandwidth float64) {
    ab.mutex.Lock()
    defer ab.mutex.Unlock()
    
    // Bandwidth'e göre kalite seviyesi belirle
    if measuredBandwidth < 1.0 {
        ab.resolution = "320x240"
        ab.frameRate = 10
    } else if measuredBandwidth < 3.0 {
        ab.resolution = "640x480"
        ab.frameRate = 15
    } else {
        ab.resolution = "1280x720"
        ab.frameRate = 30
    }
    
    ab.bandwidth = measuredBandwidth
}

func (ab *AdaptiveBitrate) GetFFmpegArgs() []string {
    ab.mutex.RLock()
    defer ab.mutex.RUnlock()
    
    width, height := parseResolution(ab.resolution)
    
    return []string{
        "-vf", fmt.Sprintf("scale=%d:%d", width, height),
        "-r", fmt.Sprintf("%d", ab.frameRate),
    }
}

func parseResolution(resolution string) (int, int) {
    var width, height int
    fmt.Sscanf(resolution, "%dx%d", &width, &height)
    return width, height
}

<|tool▁calls▁begin|><|tool▁call▁begin|> read_file

Frame Throttling

Network yükünü azaltmak için frame’leri throttle etme:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
type FrameThrottler struct {
    maxFPS      int
    lastFrame   time.Time
    minInterval time.Duration
    mutex       sync.Mutex
}

func NewFrameThrottler(maxFPS int) *FrameThrottler {
    return &FrameThrottler{
        maxFPS:      maxFPS,
        minInterval: time.Second / time.Duration(maxFPS),
    }
}

func (ft *FrameThrottler) ShouldProcess() bool {
    ft.mutex.Lock()
    defer ft.mutex.Unlock()
    
    now := time.Now()
    if now.Sub(ft.lastFrame) >= ft.minInterval {
        ft.lastFrame = now
        return true
    }
    return false
}

17.3 Network Resilience

Connection Retry ve Backoff

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
type NetworkRetry struct {
    maxRetries int
    backoff    time.Duration
    mutex      sync.Mutex
}

func (nr *NetworkRetry) ConnectWithRetry(connectFn func() error) error {
    nr.mutex.Lock()
    defer nr.mutex.Unlock()
    
    backoff := nr.backoff
    for i := 0; i < nr.maxRetries; i++ {
        if err := connectFn(); err == nil {
            return nil
        }
        
        if i < nr.maxRetries-1 {
            time.Sleep(backoff)
            backoff *= 2  // Exponential backoff
        }
    }
    
    return fmt.Errorf("connection failed after %d retries", nr.maxRetries)
}

Network Quality Monitoring

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
type NetworkMonitor struct {
    latency     []time.Duration
    packetLoss  float64
    bandwidth   float64
    mutex       sync.RWMutex
}

func (nm *NetworkMonitor) MeasureLatency() time.Duration {
    start := time.Now()
    // Ping veya test paketi gönder
    // ...
    latency := time.Since(start)
    
    nm.mutex.Lock()
    nm.latency = append(nm.latency, latency)
    if len(nm.latency) > 100 {
        nm.latency = nm.latency[1:]
    }
    nm.mutex.Unlock()
    
    return latency
}

func (nm *NetworkMonitor) GetAverageLatency() time.Duration {
    nm.mutex.RLock()
    defer nm.mutex.RUnlock()
    
    if len(nm.latency) == 0 {
        return 0
    }
    
    var sum time.Duration
    for _, l := range nm.latency {
        sum += l
    }
    return sum / time.Duration(len(nm.latency))
}

17.4 Video Compression Stratejileri

Edge’den cloud’a gönderilen event’lerde frame’leri sıkıştırma:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
// JPEG kalite ayarına göre compression
func compressFrame(frame *Frame, quality int) ([]byte, error) {
    img := &image.RGBA{
        Pix:    frame.Data,
        Stride: frame.Width * 3,
        Rect:   image.Rect(0, 0, frame.Width, frame.Height),
    }
    
    var buf bytes.Buffer
    opts := &jpeg.Options{Quality: quality}
    
    if err := jpeg.Encode(&buf, img, opts); err != nil {
        return nil, err
    }
    
    return buf.Bytes(), nil
}

// Quality seviyeleri:
// - 90-100: Yüksek kalite, büyük dosya
// - 70-89:  Orta kalite, orta dosya
// - 50-69:  Düşük kalite, küçük dosya

18. Deployment ve Containerization

Dockerfile

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# Build aşaması - Multi-stage build ile cache optimization
FROM golang:1.23-alpine AS builder

WORKDIR /app

# FFmpeg ve OpenCV bağımlılıkları
RUN apk add --no-cache \
    ffmpeg \
    pkgconfig \
    gstreamer \
    gstreamer-dev \
    opencv-dev

# Dependencies önce (cache için - değişmeyen dosyalar)
COPY go.mod go.sum ./
RUN go mod download

# Sonra kod (değişen dosyalar)
COPY . .

# Binary'yi optimize ederek derle
RUN CGO_ENABLED=1 GOOS=linux go build \
    -ldflags="-w -s" \
    -o video-processor ./cmd/main.go

# Runtime image - Minimal ve güvenli
FROM alpine:latest

RUN apk add --no-cache \
    ffmpeg \
    ca-certificates \
    libc6-compat

# Non-root user oluştur (security best practice)
RUN addgroup -g 1000 appuser && \
    adduser -D -u 1000 -G appuser appuser

WORKDIR /app

# Binary'yi builder'dan kopyala
COPY --from=builder /app/video-processor /app/video-processor

# Ownership'i değiştir
RUN chown -R appuser:appuser /app

# Non-root user olarak çalıştır
USER appuser

CMD ["/app/video-processor"]

Docker Compose Örneği

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
version: '3.8'

services:
  video-processor:
    build: .
    environment:
      - RTSP_URL=rtsp://camera:554/stream
      - MQTT_BROKER=mqtt://broker:1883
      - LOG_LEVEL=info
    volumes:
      - ./config:/app/config
    restart: unless-stopped
    
  mqtt-broker:
    image: eclipse-mosquitto:latest
    ports:
      - "1883:1883"

K3s/Kubernetes Deployment

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
apiVersion: apps/v1
kind: Deployment
metadata:
  name: video-processor
spec:
  replicas: 1
  selector:
    matchLabels:
      app: video-processor
  template:
    metadata:
      labels:
        app: video-processor
    spec:
      containers:
      - name: video-processor
        image: video-processor:latest
        env:
        - name: RTSP_URL
          valueFrom:
            configMapKeyRef:
              name: video-config
              key: rtsp-url
        resources:
          requests:
            memory: "256Mi"
            cpu: "500m"
          limits:
            memory: "512Mi"
            cpu: "1000m"

18.3 CI/CD Pipeline

Production deployment için CI/CD pipeline örneği:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# .github/workflows/deploy.yml
name: Build and Deploy

on:
  push:
    branches: [main]

jobs:
  test:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3
      - uses: actions/setup-go@v4
        with:
          go-version: '1.23'
      
      - name: Run tests
        run: |
          go test -v -race -coverprofile=coverage.out ./...          
      
      - name: Upload coverage
        uses: codecov/codecov-action@v3
        with:
          file: ./coverage.out
  
  build:
    needs: test
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3
      
      - name: Build Docker image
        run: |
          docker build -t video-processor:${{ github.sha }} .
          docker tag video-processor:${{ github.sha }} video-processor:latest          
      
      - name: Push to registry
        run: |
          echo "${{ secrets.DOCKER_PASSWORD }}" | docker login -u "${{ secrets.DOCKER_USERNAME }}" --password-stdin
          docker push video-processor:${{ github.sha }}
          docker push video-processor:latest          
  
  deploy:
    needs: build
    runs-on: ubuntu-latest
    steps:
      - name: Deploy to Kubernetes
        run: |
          kubectl set image deployment/video-processor \
            video-processor=video-processor:${{ github.sha }} \
            -n production          

GitLab CI örneği:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# .gitlab-ci.yml
stages:
  - test
  - build
  - deploy

test:
  stage: test
  image: golang:1.23
  script:
    - go test -v -race ./...
    - go vet ./...

build:
  stage: build
  image: docker:latest
  services:
    - docker:dind
  script:
    - docker build -t $CI_REGISTRY_IMAGE:$CI_COMMIT_SHA .
    - docker push $CI_REGISTRY_IMAGE:$CI_COMMIT_SHA
    - docker tag $CI_REGISTRY_IMAGE:$CI_COMMIT_SHA $CI_REGISTRY_IMAGE:latest
    - docker push $CI_REGISTRY_IMAGE:latest

deploy:
  stage: deploy
  image: bitnami/kubectl:latest
  script:
    - kubectl set image deployment/video-processor video-processor=$CI_REGISTRY_IMAGE:$CI_COMMIT_SHA
  only:
    - main

18.4 Blue-Green Deployment

Sıfır downtime deployment için blue-green stratejisi:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# blue-green-deployment.yaml
apiVersion: v1
kind: Service
metadata:
  name: video-processor-service
spec:
  selector:
    app: video-processor
    version: blue  # veya green
  ports:
  - port: 8080
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: video-processor-blue
spec:
  replicas: 3
  template:
    metadata:
      labels:
        app: video-processor
        version: blue
    spec:
      containers:
      - name: video-processor
        image: video-processor:v1.0.0
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: video-processor-green
spec:
  replicas: 0  # Başlangıçta kapalı
  template:
    metadata:
      labels:
        app: video-processor
        version: green
    spec:
      containers:
      - name: video-processor
        image: video-processor:v1.1.0

Blue-green deployment script:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#!/bin/bash
# blue-green-deploy.sh

CURRENT_VERSION=$(kubectl get service video-processor-service -o jsonpath='{.spec.selector.version}')
NEW_VERSION=$([ "$CURRENT_VERSION" == "blue" ] && echo "green" || echo "blue")
NEW_IMAGE="video-processor:v1.2.0"

echo "Current version: $CURRENT_VERSION"
echo "Deploying to: $NEW_VERSION"

# Yeni versiyonu scale up
kubectl scale deployment video-processor-$NEW_VERSION --replicas=3
kubectl set image deployment/video-processor-$NEW_VERSION video-processor=$NEW_IMAGE

# Health check
echo "Waiting for new version to be ready..."
kubectl wait --for=condition=available --timeout=300s deployment/video-processor-$NEW_VERSION

# Service'i yeni versiyona yönlendir
kubectl patch service video-processor-service -p "{\"spec\":{\"selector\":{\"version\":\"$NEW_VERSION\"}}}"

# Eski versiyonu scale down
kubectl scale deployment video-processor-$CURRENT_VERSION --replicas=0

echo "Deployment complete. New version: $NEW_VERSION"

18.5 Rollback Mekanizması

Hızlı rollback için mekanizma:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package main

import (
    "context"
    "fmt"
    "os/exec"
    "time"
)

type DeploymentManager struct {
    currentVersion string
    previousVersion string
    kubeconfig     string
}

func (dm *DeploymentManager) Rollback() error {
    if dm.previousVersion == "" {
        return fmt.Errorf("no previous version to rollback to")
    }
    
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
    defer cancel()
    
    // Eski versiyona geri dön
    cmd := exec.CommandContext(ctx, "kubectl", 
        "set", "image", "deployment/video-processor",
        fmt.Sprintf("video-processor=video-processor:%s", dm.previousVersion),
    )
    
    if err := cmd.Run(); err != nil {
        return fmt.Errorf("rollback failed: %w", err)
    }
    
    // Health check
    if err := dm.waitForDeployment(ctx); err != nil {
        return fmt.Errorf("rollback health check failed: %w", err)
    }
    
    dm.currentVersion, dm.previousVersion = dm.previousVersion, dm.currentVersion
    return nil
}

func (dm *DeploymentManager) waitForDeployment(ctx context.Context) error {
    cmd := exec.CommandContext(ctx, "kubectl", 
        "wait", "--for=condition=available",
        "--timeout=300s", "deployment/video-processor",
    )
    return cmd.Run()
}

Kubernetes rollback komutu:

1
2
3
4
5
6
7
8
# Son başarılı revision'a dön
kubectl rollout undo deployment/video-processor

# Belirli bir revision'a dön
kubectl rollout undo deployment/video-processor --to-revision=3

# Rollout geçmişini görüntüle
kubectl rollout history deployment/video-processor

18.6 Canary Deployment

Kademeli deployment için canary stratejisi:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
apiVersion: v1
kind: Service
metadata:
  name: video-processor
spec:
  selector:
    app: video-processor
  ports:
  - port: 8080
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: video-processor-stable
spec:
  replicas: 9
  template:
    metadata:
      labels:
        app: video-processor
        version: stable
    spec:
      containers:
      - name: video-processor
        image: video-processor:v1.0.0
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: video-processor-canary
spec:
  replicas: 1  # %10 trafik
  template:
    metadata:
      labels:
        app: video-processor
        version: canary
    spec:
      containers:
      - name: video-processor
        image: video-processor:v1.1.0

18.7 Pre-Deployment Checklist

Production’a deploy etmeden önce kontrol listesi:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
import (
    "context"
    "fmt"
    "os/exec"
    "time"
)

type PreDeploymentChecklist struct {
    checks []Check
}

type Check struct {
    Name        string
    Description string
    Validate    func() error
}

func NewPreDeploymentChecklist() *PreDeploymentChecklist {
    return &PreDeploymentChecklist{
        checks: []Check{
            {
                Name:        "Unit Tests",
                Description: "Tüm unit testler geçiyor mu?",
                Validate: func() error {
                    cmd := exec.Command("go", "test", "./...")
                    return cmd.Run()
                },
            },
            {
                Name:        "Integration Tests",
                Description: "Integration testler geçiyor mu?",
                Validate: func() error {
                    cmd := exec.Command("go", "test", "-tags=integration", "./...")
                    return cmd.Run()
                },
            },
            {
                Name:        "Security Scan",
                Description: "Güvenlik taraması yapıldı mı?",
                Validate: func() error {
                    // Trivy, Snyk gibi araçlarla scan
                    return nil
                },
            },
            {
                Name:        "Performance Test",
                Description: "Performance testleri geçiyor mu?",
                Validate: func() error {
                    // Load test sonuçlarını kontrol et
                    return nil
                },
            },
        },
    }
}

func (pdc *PreDeploymentChecklist) Run() error {
    for _, check := range pdc.checks {
        fmt.Printf("Checking: %s...\n", check.Name)
        if err := check.Validate(); err != nil {
            return fmt.Errorf("%s failed: %w", check.Name, err)
        }
        fmt.Printf("✓ %s passed\n", check.Name)
    }
    return nil
}

19. Güvenlik

Production ortamlarında güvenlik kritik öneme sahiptir. Edge processing sistemleri hassas video verilerini işlediği için güvenlik katmanları mutlaka uygulanmalıdır.

19.1 Credential Management ve Secret Rotation

RTSP URL’lerinde kullanıcı adı ve şifre kullanılıyorsa, environment variable veya secret management kullanın:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import (
    "crypto/tls"
    "fmt"
    "net/url"
    "os"
    "time"
)

type CredentialManager struct {
    secrets     map[string]string
    lastRotated time.Time
    mutex       sync.RWMutex
}

func NewCredentialManager() *CredentialManager {
    cm := &CredentialManager{
        secrets: make(map[string]string),
    }
    cm.loadSecrets()
    return cm
}

func (cm *CredentialManager) loadSecrets() {
    // Kubernetes secrets veya HashiCorp Vault'tan yükle
    cm.secrets["rtsp_username"] = os.Getenv("RTSP_USERNAME")
    cm.secrets["rtsp_password"] = os.Getenv("RTSP_PASSWORD")
}

func (cm *CredentialManager) GetRTSPURL(baseURL string) string {
    cm.mutex.RLock()
    defer cm.mutex.RUnlock()
    
    username := cm.secrets["rtsp_username"]
    password := cm.secrets["rtsp_password"]
    
    if username != "" && password != "" {
        return fmt.Sprintf("rtsp://%s:%s@%s", username, password, baseURL)
    }
    return baseURL
}

// Log'larda credential'ları maskelemek için yardımcı fonksiyon
func maskCredentials(rawURL string) string {
    u, err := url.Parse(rawURL)
    if err != nil {
        return rawURL
    }
    if u.User != nil {
        u.User = url.UserPassword("***", "***")
    }
    return u.String()
}

// Secret rotation - periyodik olarak secret'ları yenile
func (cm *CredentialManager) RotateSecrets() {
    cm.mutex.Lock()
    defer cm.mutex.Unlock()
    
    // Yeni secret'ları yükle
    cm.loadSecrets()
    cm.lastRotated = time.Now()
}

19.2 RTSP Authentication ve TLS

RTSP bağlantılarını güvenli hale getirmek için:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import (
    "crypto/tls"
    "net/url"
)

type SecureRTSPClient struct {
    tlsConfig *tls.Config
}

func NewSecureRTSPClient(certFile, keyFile string) (*SecureRTSPClient, error) {
    cert, err := tls.LoadX509KeyPair(certFile, keyFile)
    if err != nil {
        return nil, err
    }
    
    return &SecureRTSPClient{
        tlsConfig: &tls.Config{
            Certificates: []tls.Certificate{cert},
            MinVersion:   tls.VersionTLS12,
        },
    }, nil
}

// RTSP over TLS (RTSPS) kullanımı
func (src *SecureRTSPClient) GetSecureRTSPURL(baseURL string) string {
    u, err := url.Parse(baseURL)
    if err != nil {
        return baseURL
    }
    
    // RTSPS (RTSP over TLS) kullan
    if u.Scheme == "rtsp" {
        u.Scheme = "rtsps"
    }
    
    return u.String()
}

19.3 Video Stream Encryption (SRTP)

Gerçek zamanlı video akışlarını şifrelemek için SRTP (Secure Real-time Transport Protocol) kullanın:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import (
    "crypto/aes"
    "crypto/cipher"
    "crypto/rand"
    "io"
)

type StreamEncryptor struct {
    aesgcm cipher.AEAD
    nonce  []byte
}

func NewStreamEncryptor(key []byte) (*StreamEncryptor, error) {
    block, err := aes.NewCipher(key)
    if err != nil {
        return nil, err
    }
    
    aesgcm, err := cipher.NewGCM(block)
    if err != nil {
        return nil, err
    }
    
    nonce := make([]byte, aesgcm.NonceSize())
    if _, err := io.ReadFull(rand.Reader, nonce); err != nil {
        return nil, err
    }
    
    return &StreamEncryptor{
        aesgcm: aesgcm,
        nonce:  nonce,
    }, nil
}

func (se *StreamEncryptor) EncryptFrame(frameData []byte) ([]byte, error) {
    return se.aesgcm.Seal(nil, se.nonce, frameData, nil), nil
}

func (se *StreamEncryptor) DecryptFrame(encryptedData []byte) ([]byte, error) {
    return se.aesgcm.Open(nil, se.nonce, encryptedData, nil)
}

19.4 API Authentication & Authorization

HTTP API endpoint’leri için authentication ve authorization:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
import (
    "fmt"
    "net/http"
    "time"
    
    "github.com/golang-jwt/jwt/v5"
)

type AuthMiddleware struct {
    secretKey []byte
}

func NewAuthMiddleware(secretKey string) *AuthMiddleware {
    return &AuthMiddleware{
        secretKey: []byte(secretKey),
    }
}

func (am *AuthMiddleware) Authenticate(next http.HandlerFunc) http.HandlerFunc {
    return func(w http.ResponseWriter, r *http.Request) {
        token := r.Header.Get("Authorization")
        if token == "" {
            http.Error(w, "Unauthorized", http.StatusUnauthorized)
            return
        }
        
        // JWT token doğrula
        claims, err := am.validateToken(token)
        if err != nil {
            http.Error(w, "Invalid token", http.StatusUnauthorized)
            return
        }
        
        // Role-based authorization
        if !am.hasPermission(claims.Role, r.URL.Path) {
            http.Error(w, "Forbidden", http.StatusForbidden)
            return
        }
        
        next(w, r)
    }
}

func (am *AuthMiddleware) validateToken(tokenString string) (*Claims, error) {
    token, err := jwt.ParseWithClaims(tokenString, &Claims{}, func(token *jwt.Token) (interface{}, error) {
        return am.secretKey, nil
    })
    
    if err != nil {
        return nil, err
    }
    
    if claims, ok := token.Claims.(*Claims); ok && token.Valid {
        return claims, nil
    }
    
    return nil, fmt.Errorf("invalid token")
}

func (am *AuthMiddleware) hasPermission(role, path string) bool {
    // Basit role-based authorization
    // Production'da daha karmaşık bir sistem kullanılabilir
    
    // Admin tüm endpoint'lere erişebilir
    if role == "admin" {
        return true
    }
    
    // User sadece read-only endpoint'lere erişebilir
    if role == "user" {
        return path == "/api/stats" || path == "/api/health"
    }
    
    // Viewer sadece preview endpoint'lerine erişebilir
    if role == "viewer" {
        return path == "/preview" || path == "/preview/list"
    }
    
    return false
}

type Claims struct {
    Username string `json:"username"`
    Role     string `json:"role"`
    jwt.RegisteredClaims
}

19.5 Container Security Best Practices

Docker container’ları için güvenlik önlemleri:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
# Non-root user kullan
RUN groupadd -r appuser && useradd -r -g appuser appuser
USER appuser

# Minimal base image kullan
FROM gcr.io/distroless/base-debian11

# Sadece gerekli port'ları expose et
EXPOSE 8080

# Read-only filesystem (mümkünse)
# docker run --read-only --tmpfs /tmp

# Security scanning
# docker scan <image-name>

Kubernetes Security Context:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
apiVersion: v1
kind: Pod
spec:
  securityContext:
    runAsNonRoot: true
    runAsUser: 1000
    fsGroup: 1000
    seccompProfile:
      type: RuntimeDefault
  containers:
  - name: video-processor
    securityContext:
      allowPrivilegeEscalation: false
      readOnlyRootFilesystem: true
      capabilities:
        drop:
        - ALL

19.6 Certificate Management

TLS sertifikalarının yönetimi:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
import (
    "crypto/tls"
    "crypto/x509"
    "time"
)

type CertificateManager struct {
    cert     *tls.Certificate
    certPool *x509.CertPool
    mutex    sync.RWMutex
}

func NewCertificateManager(certFile, keyFile, caFile string) (*CertificateManager, error) {
    cert, err := tls.LoadX509KeyPair(certFile, keyFile)
    if err != nil {
        return nil, err
    }
    
    caCert, err := os.ReadFile(caFile)
    if err != nil {
        return nil, err
    }
    
    certPool := x509.NewCertPool()
    if !certPool.AppendCertsFromPEM(caCert) {
        return nil, fmt.Errorf("failed to parse CA certificate")
    }
    
    return &CertificateManager{
        cert:     &cert,
        certPool: certPool,
    }, nil
}

// Certificate expiration kontrolü
func (cm *CertificateManager) CheckExpiration() (time.Duration, error) {
    cm.mutex.RLock()
    defer cm.mutex.RUnlock()
    
    cert, err := x509.ParseCertificate(cm.cert.Certificate[0])
    if err != nil {
        return 0, err
    }
    
    return time.Until(cert.NotAfter), nil
}

// Sertifika yenileme logic'i için placeholder (ör. ACME / Let's Encrypt)
func (cm *CertificateManager) renewCertificate() error {
    // Gerçek bir sistemde ACME client implementasyonu veya harici bir certificate manager ile entegrasyon yapılır
    return fmt.Errorf("certificate renewal not implemented")
}

// Auto-renewal için
func (cm *CertificateManager) StartAutoRenewal() {
    go func() {
        ticker := time.NewTicker(24 * time.Hour)
        defer ticker.Stop()
        
        for range ticker.C {
            timeLeft, err := cm.CheckExpiration()
            if err != nil {
                continue
            }
            
            // 30 günden az kaldıysa yenile
            if timeLeft < 30*24*time.Hour {
                cm.renewCertificate()
            }
        }
    }()
}

19.7 Input Validation ve Sanitization

Frame data’yı işlemeden önce validate edin:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import (
    "errors"
    "fmt"
)

func validateFrame(frame *Frame) error {
    if frame == nil {
        return errors.New("frame is nil")
    }
    if len(frame.Data) == 0 {
        return errors.New("frame data is empty")
    }
    expectedSize := frame.Width * frame.Height * 3
    if len(frame.Data) != expectedSize {
        return fmt.Errorf("frame size mismatch: expected %d, got %d", expectedSize, len(frame.Data))
    }
    
    // Boyut limitleri
    if frame.Width > 4096 || frame.Height > 4096 {
        return errors.New("frame dimensions too large")
    }
    
    return nil
}

19.8 Security Monitoring ve Audit Logging

Güvenlik olaylarını loglama:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
type SecurityLogger struct {
    logger *logrus.Logger
}

func (sl *SecurityLogger) LogSecurityEvent(eventType string, details map[string]interface{}) {
    sl.logger.WithFields(logrus.Fields{
        "event_type": eventType,
        "timestamp":  time.Now(),
        "details":    details,
    }).Warn("Security event")
}

// Kullanım örnekleri:
// sl.LogSecurityEvent("authentication_failure", map[string]interface{}{
//     "ip": "192.168.1.100",
//     "username": "admin",
// })
// sl.LogSecurityEvent("unauthorized_access", map[string]interface{}{
//     "endpoint": "/api/frames",
//     "user": "user123",
// })

20. Tam Örnek Uygulama

Tüm parçaları bir araya getiren örnek:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
package main

import (
    "context"
    "log"
    "os"
    "os/signal"
    "syscall"
    "time"
)

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()
    
    // Signal handling
    sigChan := make(chan os.Signal, 1)
    signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
    
    go func() {
        <-sigChan
        log.Println("Shutting down...")
        cancel()
    }()
    
    // Video kaynağını oluştur
    source := NewVideoSource("rtsp://camera:554/stream", 640, 480)
    
    // Pipeline oluştur
    pipeline := NewPipeline(source)
    
    // Motion detector ekle
    motionDetector := NewMotionDetector(0.05, 640, 480) // %5 threshold
    pipeline.AddProcessor(motionDetector)
    
    // Object detector ekle (opsiyonel, hata olursa graceful degradation)
    // objectDetector, err := NewObjectDetector("model.onnx", "config.yaml", "classes.txt")
    // if err == nil {
    //     pipeline.AddProcessor(objectDetector)
    // }
    
    // Edge decision layer ekle
    mqttPub, err := NewMQTTPublisher("tcp://mqtt:1883", "video/events", "video-processor")
    if err != nil {
        log.Fatalf("Failed to create MQTT publisher: %v", err)
    }
    
    decisionLayer := NewEdgeDecisionLayer(0.7, mqttPub, 5*time.Second)
    pipeline.AddProcessor(decisionLayer)
    
    // Pipeline'ı başlat (goroutine'de)
    pipelineDone := make(chan error, 1)
    go func() {
        pipelineDone <- pipeline.Run(ctx)
    }()
    
    // Shutdown sinyalini bekle
    <-ctx.Done()
    log.Println("Shutting down gracefully...")
    
    // Graceful shutdown için timeout
    shutdownTimeout := 30 * time.Second
    shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), shutdownTimeout)
    defer shutdownCancel()
    
    // Pipeline'ı durdur (zorunlu timeout fallback ile)
    shutdownDone := make(chan error, 1)
    go func() {
        shutdownDone <- pipeline.Shutdown(shutdownCtx)
    }()

    select {
    case err := <-shutdownDone:
        if err != nil {
            log.Printf("Shutdown error: %v", err)
        } else {
            log.Println("Pipeline stopped successfully")
        }
    case <-time.After(30 * time.Second):
        log.Println("Force killing after shutdown timeout!")
        os.Exit(1)
    }
    
    // MQTT bağlantısını kapat (best effort)
    if err := mqttPub.Close(); err != nil {
        log.Printf("MQTT close error: %v", err)
    }
    
    // Pipeline'ın tamamlanmasını bekle
    select {
    case err := <-pipelineDone:
        if err != nil {
            log.Printf("Pipeline error: %v", err)
        }
    default:
    }
    
    log.Println("Application stopped")
}

21. Frame Recording ve Storage

Önemli event’lerde frame’leri kaydetmek için:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
type FrameRecorder struct {
    storagePath string
    maxFrames   int
    frames      []*Frame
    mutex       sync.Mutex
}

func (fr *FrameRecorder) RecordFrame(frame *Frame) error {
    fr.mutex.Lock()
    defer fr.mutex.Unlock()
    
    if len(fr.frames) >= fr.maxFrames {
        // En eski frame'i kaldır
        fr.frames = fr.frames[1:]
    }
    
    fr.frames = append(fr.frames, frame)
    return nil
}

func (fr *FrameRecorder) SaveEvent(eventID string) error {
    fr.mutex.Lock()
    frames := make([]*Frame, len(fr.frames))
    copy(frames, fr.frames)
    fr.mutex.Unlock()
    
    // Frame'leri JPEG olarak kaydet
    for i, frame := range frames {
        jpegData, err := FrameToJPEG(frame, 90)
        if err != nil {
            return err
        }
        filename := fmt.Sprintf("%s/%s_frame_%d.jpg", fr.storagePath, eventID, i)
        if err := os.WriteFile(filename, jpegData, 0644); err != nil {
            return err
        }
    }
    
    return nil
}

22. Alerting ve Bildirimler

Event’ler için bildirim mekanizması:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
type AlertManager struct {
    notifiers []Notifier
}

type Notifier interface {
    Send(alert *Alert) error
}

type Alert struct {
    Level       string
    Message     string
    Event       *Event
    Timestamp   time.Time
}

// Email notifier
type EmailNotifier struct {
    smtpServer string
    recipients []string
}

// Slack notifier
type SlackNotifier struct {
    webhookURL string
}

// SMS notifier (Twilio, vb.)
type SMSNotifier struct {
    apiKey string
    phone  string
}

func (am *AlertManager) SendAlert(alert *Alert) {
    for _, notifier := range am.notifiers {
        go func(n Notifier) {
            if err := n.Send(alert); err != nil {
                log.Printf("Alert gönderilemedi: %v", err)
            }
        }(notifier)
    }
}

23. Hardware Gereksinimleri

Minimum Gereksinimler (Motion Detection Only)

  • CPU: 2 core, 1.5 GHz+
  • RAM: 512 MB
  • Storage: 1 GB (OS + uygulama)
  • Network: 10 Mbps

Önerilen Gereksinimler (Object Detection)

  • CPU: 4 core, 2.0 GHz+
  • RAM: 2 GB
  • Storage: 5 GB
  • Network: 100 Mbps
  • GPU (opsiyonel): NVIDIA Jetson Nano, Intel Neural Compute Stick

Production Gereksinimleri (Multi-Camera)

  • CPU: 8+ core, 2.5 GHz+
  • RAM: 8 GB+
  • Storage: 50 GB+ (event recording için)
  • Network: 1 Gbps
  • GPU: NVIDIA Jetson Xavier NX veya dedicated AI accelerator

24. Gerçek Dünya Deneyimleri ve Karşılaşılan Zorluklar

Bu projeyi geliştirirken karşılaştığımız bazı zorluklar ve çözümler:

24.1 FFmpeg Process Yönetimi

Problem: FFmpeg process’i beklenmedik şekilde crash olabiliyor veya bağlantı kopabiliyor.

Çözüm: Sürekli monitoring ve otomatik restart mekanizması:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func (vs *VideoSource) StartWithMonitoring(ctx context.Context) error {
    for {
        if err := vs.Start(ctx); err != nil {
            log.Printf("FFmpeg başlatılamadı: %v, 5 saniye sonra tekrar deneniyor...", err)
            select {
            case <-ctx.Done():
                return ctx.Err()
            case <-time.After(5 * time.Second):
                continue
            }
        }
        
        // Process'in çalışıp çalışmadığını kontrol et
        select {
        case <-ctx.Done():
            return ctx.Err()
        case err := <-vs.Errors():
            log.Printf("FFmpeg hatası: %v, yeniden başlatılıyor...", err)
            vs.Stop()
            time.Sleep(2 * time.Second)
        }
    }
}

24.2 Memory Leak’ler

Problem: Frame buffer’ları düzgün temizlenmediğinde bellek kullanımı sürekli artıyor.

Çözüm: sync.Pool kullanımı ve düzenli GC çağrıları:

1
2
3
4
5
6
7
8
9
// Her 1000 frame'de bir GC çağır
frameCount := 0
for frame := range frames {
    frameCount++
    if frameCount%1000 == 0 {
        runtime.GC()
    }
    // Frame işleme...
}

24.3 Network Bağlantı Sorunları

Problem: RTSP bağlantıları zaman zaman kopuyor, özellikle WiFi üzerinden.

Çözüm: TCP transport kullanımı ve connection timeout ayarları:

1
2
# UDP yerine TCP kullan (daha güvenilir ama biraz daha yavaş)
ffmpeg -rtsp_transport tcp -i rtsp://...

24.4 Frame Rate Tutarsızlıkları

Problem: Kaynak 30 FPS sağlıyor ama pipeline sadece 10-15 FPS işleyebiliyor.

Çözüm: Frame skipping ve adaptive processing:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
type AdaptiveProcessor struct {
    targetFPS     int
    currentFPS    float64
    skipRatio     int
    lastFrameTime time.Time
}

func (ap *AdaptiveProcessor) ShouldProcess() bool {
    now := time.Now()
    if ap.lastFrameTime.IsZero() {
        ap.lastFrameTime = now
        return true
    }
    
    elapsed := now.Sub(ap.lastFrameTime)
    currentFPS := 1.0 / elapsed.Seconds()
    
    if currentFPS > float64(ap.targetFPS)*1.2 {
        // Çok hızlı, bazı frame'leri atla
        ap.skipRatio++
        return ap.skipRatio%2 == 0
    }
    
    ap.skipRatio = 0
    ap.lastFrameTime = now
    return true
}

24.5 Object Detection Gecikmeleri

Problem: Object detection model’i çok yavaş, pipeline’ı tıkıyor.

Çözüm: Async processing ve frame buffering:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func (od *ObjectDetector) ProcessAsync(ctx context.Context, in <-chan *Frame, out chan<- *Frame) {
    // Detection'ları async olarak çalıştır
    detectionResults := make(map[int]*Frame)
    resultMutex := sync.RWMutex{}
    
    frameID := 0
    for frame := range in {
        currentID := frameID
        frameID++
        
        // Frame'i hemen geçir (detection olmadan)
        select {
        case out <- frame:
        case <-ctx.Done():
            return
        }
        
        // Detection'ı arka planda yap
        go func(f *Frame, id int) {
            detections := od.detect(f)
            resultMutex.Lock()
            detectionResults[id] = f
            f.Metadata["detections"] = detections
            resultMutex.Unlock()
        }(frame, currentID)
    }
}

24.6 Multi-Camera Yönetimi

Problem: Birden fazla kameradan gelen stream’leri yönetmek karmaşık.

Çözüm: Camera manager pattern:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
type CameraManager struct {
    cameras map[string]*VideoSource
    pipelines map[string]*Pipeline
    mutex sync.RWMutex
}

func (cm *CameraManager) AddCamera(id, url string, width, height int) error {
    cm.mutex.Lock()
    defer cm.mutex.Unlock()
    
    source := NewVideoSource(url, width, height)
    pipeline := NewPipeline(source)
    
    cm.cameras[id] = source
    cm.pipelines[id] = pipeline
    
    return pipeline.Run(context.Background())
}

func (cm *CameraManager) RemoveCamera(id string) error {
    cm.mutex.Lock()
    defer cm.mutex.Unlock()
    
    if pipeline, exists := cm.pipelines[id]; exists {
        pipeline.Stop()
        delete(cm.pipelines, id)
    }
    
    if source, exists := cm.cameras[id]; exists {
        source.Stop()
        delete(cm.cameras, id)
    }
    
    return nil
}

25. Troubleshooting Rehberi

25.1 FFmpeg Bağlantı Hatası

Problem: Connection refused veya Connection timeout

Çözüm:

  1. RTSP URL’ini kontrol edin: ffmpeg -rtsp_transport tcp -i <url> -t 1 -f null -
  2. Firewall kurallarını kontrol edin
  3. Kamera ayarlarında RTSP port’unun açık olduğundan emin olun
  4. TCP transport kullanmayı deneyin (UDP yerine)

25.2 Yüksek CPU Kullanımı

Problem: CPU %100’e yakın, frame rate düşüyor

Çözüm:

  1. Frame rate’i düşürün (target FPS ayarı)
  2. Çözünürlüğü azaltın (640x480 → 320x240)
  3. Object detection’ı devre dışı bırakın veya async yapın
  4. Daha güçlü hardware kullanın

25.3 Memory Leak

Problem: Bellek kullanımı sürekli artıyor

Çözüm:

  1. sync.Pool kullanımını kontrol edin
  2. Frame buffer’ların düzgün temizlendiğinden emin olun
  3. Periyodik GC çağrıları ekleyin
  4. pprof ile memory profile alın

26. Maliyet Analizi

26.1 Cloud vs Edge Processing Karşılaştırması

Cloud Video Processing (AWS Rekognition, Azure Video Analyzer):

  • Video başına: ~$0.10-0.50/dakika
  • 10 kamera, 24/7: ~$1,440-7,200/ay
  • Network maliyeti: ~$200-500/ay
  • Toplam: ~$1,640-7,700/ay

Edge Processing (Bu Sistem):

  • Edge device: ~$50-200 (bir kerelik)
  • Cloud storage (sadece event’ler): ~$10-50/ay
  • Network (sadece event’ler): ~$5-20/ay
  • Toplam: ~$15-70/ay + bir kerelik hardware

Tasarruf: %95-99 maliyet azalması (hardware maliyeti hariç)

27. Production Best Practices

27.1 Resource Limits

Docker/Kubernetes’te resource limit’leri mutlaka ayarlayın:

1
2
3
4
5
6
7
resources:
  requests:
    memory: "512Mi"
    cpu: "1000m"
  limits:
    memory: "1Gi"
    cpu: "2000m"

27.2 Graceful Shutdown

Sistem kapanırken tüm goroutine’lerin düzgün şekilde sonlanmasını sağlayın:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
func (p *Pipeline) Shutdown(ctx context.Context) error {
    // Timeout / cancellation için dışarıdan verilen context'i kullan
    done := make(chan error, 1)
    go func() {
        done <- p.Stop()
    }()

    select {
    case err := <-done:
        return err
    case <-ctx.Done():
        return ctx.Err()
    }
}

27.3 Circuit Breaker Pattern

External servislere (object detection API) bağlanırken circuit breaker kullanın:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import "github.com/sony/gobreaker"

type CircuitBreakerDetector struct {
    detector *ExternalDetector
    cb       *gobreaker.CircuitBreaker
}

func NewCircuitBreakerDetector(detector *ExternalDetector) *CircuitBreakerDetector {
    cb := gobreaker.NewCircuitBreaker(gobreaker.Settings{
        Name:    "ObjectDetection",
        Timeout: 5 * time.Second,
        ReadyToTrip: func(counts gobreaker.Counts) bool {
            return counts.ConsecutiveFailures > 5
        },
    })
    
    return &CircuitBreakerDetector{
        detector: detector,
        cb:       cb,
    }
}

func (cbd *CircuitBreakerDetector) Detect(frame *Frame) ([]Detection, error) {
    result, err := cbd.cb.Execute(func() (interface{}, error) {
        return cbd.detector.callInferenceService(frame)
    })
    
    if err != nil {
        return nil, err
    }
    
    return result.([]Detection), nil
}

27.4 Rate Limiting

Event gönderiminde rate limiting kullanın:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import "golang.org/x/time/rate"

type RateLimitedPublisher struct {
    publisher EventPublisher
    limiter   *rate.Limiter
}

func NewRateLimitedPublisher(publisher EventPublisher, eventsPerSecond int) *RateLimitedPublisher {
    return &RateLimitedPublisher{
        publisher: publisher,
        limiter:   rate.NewLimiter(rate.Limit(eventsPerSecond), eventsPerSecond),
    }
}

func (rlp *RateLimitedPublisher) Publish(event *Event) error {
    ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
    defer cancel()
    
    if err := rlp.limiter.Wait(ctx); err != nil {
        return fmt.Errorf("rate limit exceeded: %w", err)
    }
    
    return rlp.publisher.Publish(event)
}

27.5 Structured Logging

Tüm log’ları structured format’ta tutun:

1
2
3
4
5
6
7
8
logger.WithFields(logrus.Fields{
    "camera_id":    cameraID,
    "frame_id":     frameID,
    "event_type":   event.Type,
    "confidence":   event.Confidence,
    "processing_ms": processingTime.Milliseconds(),
    "timestamp":    time.Now().Unix(),
}).Info("Event detected")

27.6 Metrics Collection

Önemli metrikleri toplayın ve export edin:

  • Frame processing rate (FPS)
  • Detection accuracy
  • Error rates
  • Memory usage
  • CPU usage
  • Network latency
  • Event publish success rate

27.7 Configuration Management (Viper)

Sensitive bilgileri environment variable veya secret management’tan alın:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import "github.com/spf13/viper"

func LoadConfig() (*Config, error) {
    viper.SetConfigName("config")
    viper.SetConfigType("yaml")
    viper.AddConfigPath(".")
    viper.AddConfigPath("/etc/video-processor/")
    
    // Environment variable'ları override edebilir
    viper.SetEnvPrefix("VIDEO")
    viper.AutomaticEnv()
    
    if err := viper.ReadInConfig(); err != nil {
        return nil, err
    }
    
    var config Config
    if err := viper.Unmarshal(&config); err != nil {
        return nil, err
    }
    
    return &config, nil
}

28. Alternatif Yaklaşımlar ve Trade-off’lar

28.1 FFmpeg vs GStreamer

FFmpeg:

  • ✅ Daha yaygın, iyi dokümante edilmiş
  • ✅ Çok sayıda format desteği
  • ❌ Process overhead (her frame için pipe I/O)
  • ❌ Biraz daha yavaş

GStreamer:

  • ✅ Daha optimize, daha hızlı
  • ✅ Plugin mimarisi
  • ❌ Daha az dokümante
  • ❌ Kurulum daha karmaşık

Karar: FFmpeg’i seçtik çünkü daha yaygın ve dokümante edilmiş. GStreamer daha performanslı olabilir ama FFmpeg bizim ihtiyaçlarımız için yeterli.

28.2 GoCV vs External Service

GoCV (In-process):

  • ✅ Düşük latency
  • ✅ Network bağımlılığı yok
  • ❌ Model binary’ye dahil (büyük binary)
  • ❌ CPU kullanımı yüksek

External Service:

  • ✅ Model’i ayrı serviste çalıştırma (GPU kullanımı kolay)
  • ✅ Daha küçük binary
  • ❌ Network latency
  • ❌ Ek servis yönetimi

Karar: Küçük modeller için GoCV, büyük modeller için external service kullanıyoruz.

28.3 MQTT vs Kafka vs Webhook

MQTT:

  • ✅ Hafif, edge için ideal
  • ✅ Low latency
  • ✅ QoS desteği
  • ❌ Message ordering garantisi yok

Kafka:

  • ✅ Yüksek throughput
  • ✅ Message ordering
  • ❌ Daha ağır, edge için fazla
  • ❌ Daha yüksek latency

Webhook:

  • ✅ Basit HTTP
  • ✅ Kolay entegrasyon
  • ❌ Retry mekanizması yok
  • ❌ Network bağımlılığı

Karar: Edge’den cloud’a event gönderimi için MQTT, yüksek throughput gerektiğinde Kafka kullanıyoruz.

28.4 FFmpeg Yerine Alternatif Yaklaşımlar

FFmpeg haricinde Go tabanlı video işleme kütüphaneleri de mevcuttur:

github.com/3d0c/gmf (Go Media Framework):

  • ✅ Pure Go implementasyonu
  • ✅ FFmpeg bağımlılığı yok
  • ❌ Format desteği sınırlı
  • ❌ Aktif geliştirme yavaş

github.com/asticode/go-astits:

  • ✅ MPEG-TS stream parsing
  • ✅ Pure Go
  • ❌ Sadece MPEG-TS formatı

Neden FFmpeg Tercih Edildi?

  1. Format desteği: 100+ video/audio formatı
  2. Stabilite: Yıllardır production’da kullanılıyor
  3. Performans: C ile yazılmış, optimize edilmiş
  4. Dokümantasyon: Kapsamlı ve güncel
  5. Topluluk desteği: Geniş kullanıcı tabanı

Karar: FFmpeg’i seçtik çünkü format desteği, stabilite ve performans kritikti. Go tabanlı alternatifler gelecekte daha olgunlaşabilir.

29. Machine Learning Model Deployment

Edge processing sistemlerinde ML model’lerinin deployment’ı kritik bir konudur. Model versioning, A/B testing ve canary deployment stratejileri:

29.1 Model Versioning

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
type ModelManager struct {
    models      map[string]*Model
    activeModel string
    mutex       sync.RWMutex
}

type Model struct {
    Version     string
    Path        string
    LoadedAt    time.Time
    Performance ModelMetrics
}

func (mm *ModelManager) LoadModel(version, path string) error {
    mm.mutex.Lock()
    defer mm.mutex.Unlock()
    
    // Model'i yükle
    model := &Model{
        Version:  version,
        Path:     path,
        LoadedAt: time.Now(),
    }
    
    mm.models[version] = model
    
    // İlk model ise aktif yap
    if mm.activeModel == "" {
        mm.activeModel = version
    }
    
    return nil
}

func (mm *ModelManager) SwitchModel(version string) error {
    mm.mutex.Lock()
    defer mm.mutex.Unlock()
    
    if _, exists := mm.models[version]; !exists {
        return fmt.Errorf("model version %s not found", version)
    }
    
    mm.activeModel = version
    return nil
}

29.2 A/B Testing

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

type ABTester struct {
    modelA      *Model
    modelB      *Model
    splitRatio  float64  // 0.0-1.0, modelA için
    results     map[string]ABResult
    mutex       sync.RWMutex
}

type ABResult struct {
    ModelAAccuracy float64
    ModelBAccuracy float64
    ModelALatency  time.Duration
    ModelBLatency  time.Duration
}

func (ab *ABTester) Predict(frame *Frame) ([]Detection, error) {
    // Rastgele model seç (split ratio'ya göre)
    useModelA := rand.Float64() < ab.splitRatio
    
    var model *Model
    var modelName string
    if useModelA {
        model = ab.modelA
        modelName = "A"
    } else {
        model = ab.modelB
        modelName = "B"
    }
    
    start := time.Now()
    detections, err := model.Predict(frame)
    latency := time.Since(start)
    
    // Sonuçları kaydet
    ab.recordResult(modelName, detections, latency)
    
    return detections, err
}

29.3 Model Performance Monitoring

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
type ModelMetrics struct {
    TotalInferences   int64
    AverageLatency    time.Duration
    Accuracy          float64
    FalsePositives    int64
    FalseNegatives    int64
    mutex             sync.RWMutex
}

func (mm *ModelMetrics) RecordInference(latency time.Duration, correct bool) {
    mm.mutex.Lock()
    defer mm.mutex.Unlock()
    
    mm.TotalInferences++
    
    // Moving average latency
    if mm.AverageLatency == 0 {
        mm.AverageLatency = latency
    } else {
        mm.AverageLatency = (mm.AverageLatency + latency) / 2
    }
    
    // Accuracy hesapla
    if correct {
        mm.Accuracy = float64(mm.TotalInferences-mm.FalsePositives-mm.FalseNegatives) / float64(mm.TotalInferences)
    }
}

29.4 Donanım Hızlandırma ve GPU Desteği

Edge AI cihazlarıyla entegrasyon için GPU desteği kritik öneme sahiptir:

NVIDIA Jetson Entegrasyonu

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
import "gocv.io/x/gocv"

func NewGPUAcceleratedDetector(modelPath string) (*ObjectDetector, error) {
    net := gocv.ReadNet(modelPath, "")
    if net.Empty() {
        return nil, fmt.Errorf("failed to load model")
    }
    
    // CUDA backend kullan
    net.SetPreferableBackend(gocv.NetBackendCUDA)
    net.SetPreferableTarget(gocv.NetTargetCUDA)
    
    return &ObjectDetector{net: net}, nil
}

Intel Neural Compute Stick (OpenVINO)

1
2
3
// OpenVINO backend ile
net.SetPreferableBackend(gocv.NetBackendOpenVINO)
net.SetPreferableTarget(gocv.NetTargetVPU) // Intel NCS için

CUDA, OpenCL, Vulkan Karşılaştırması

API Platform Performans Edge Uygunluğu
CUDA NVIDIA ⭐⭐⭐⭐⭐ Jetson serisi
OpenCL Cross-platform ⭐⭐⭐ Genel GPU’lar
Vulkan Cross-platform ⭐⭐⭐⭐ Modern GPU’lar

Öneri: NVIDIA Jetson için CUDA, genel kullanım için OpenVINO tercih edilir.

30. Video Quality Metrics

Video kalitesini ölçmek için çeşitli metrikler kullanılabilir:

30.1 PSNR (Peak Signal-to-Noise Ratio)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import "math"

func CalculatePSNR(original, processed *Frame) float64 {
    mse := calculateMSE(original, processed)
    if mse == 0 {
        return math.Inf(1)  // Perfect match
    }
    
    maxPixelValue := 255.0
    psnr := 20 * math.Log10(maxPixelValue/math.Sqrt(mse))
    return psnr
}

func calculateMSE(original, processed *Frame) float64 {
    if len(original.Data) != len(processed.Data) {
        return math.MaxFloat64
    }
    
    var sum float64
    for i := 0; i < len(original.Data); i++ {
        diff := float64(original.Data[i]) - float64(processed.Data[i])
        sum += diff * diff
    }
    
    return sum / float64(len(original.Data))
}

30.2 SSIM (Structural Similarity Index)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import "math"

func CalculateSSIM(original, processed *Frame) float64 {
    // Basitleştirilmiş SSIM implementasyonu
    // Tam implementasyon için luminance, contrast, structure hesaplamaları gerekir
    
    mu1 := calculateMean(original)
    mu2 := calculateMean(processed)
    
    sigma1 := calculateStdDev(original, mu1)
    sigma2 := calculateStdDev(processed, mu2)
    sigma12 := calculateCovariance(original, processed, mu1, mu2)
    
    c1 := 0.01 * 255 * 0.01 * 255
    c2 := 0.03 * 255 * 0.03 * 255
    
    numerator := (2*mu1*mu2 + c1) * (2*sigma12 + c2)
    denominator := (mu1*mu1 + mu2*mu2 + c1) * (sigma1*sigma1 + sigma2*sigma2 + c2)
    
    return numerator / denominator
}

func calculateMean(frame *Frame) float64 {
    var sum float64
    for _, v := range frame.Data {
        sum += float64(v)
    }
    return sum / float64(len(frame.Data))
}

func calculateStdDev(frame *Frame, mean float64) float64 {
    var sum float64
    for _, v := range frame.Data {
        diff := float64(v) - mean
        sum += diff * diff
    }
    return math.Sqrt(sum / float64(len(frame.Data)))
}

func calculateCovariance(f1, f2 *Frame, mean1, mean2 float64) float64 {
    var sum float64
    minLen := len(f1.Data)
    if len(f2.Data) < minLen {
        minLen = len(f2.Data)
    }
    
    for i := 0; i < minLen; i++ {
        sum += (float64(f1.Data[i]) - mean1) * (float64(f2.Data[i]) - mean2)
    }
    return sum / float64(minLen)
}

31. Load Balancing ve High Availability

Production ortamlarında yüksek erişilebilirlik için load balancing ve failover mekanizmaları:

31.1 Multi-Instance Deployment

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
type LoadBalancer struct {
    instances []*VideoProcessor
    current   int
    mutex     sync.Mutex
}

func (lb *LoadBalancer) ProcessFrame(frame *Frame) error {
    lb.mutex.Lock()
    instance := lb.instances[lb.current]
    lb.current = (lb.current + 1) % len(lb.instances)
    lb.mutex.Unlock()
    
    return instance.Process(frame)
}

func (lb *LoadBalancer) HealthCheck() {
    lb.mutex.Lock()
    defer lb.mutex.Unlock()
    
    for i, instance := range lb.instances {
        if !instance.IsHealthy() {
            // Sağlıksız instance'ı listeden çıkar
            lb.removeInstance(i)
        }
    }
}

func (lb *LoadBalancer) removeInstance(index int) {
    if index < 0 || index >= len(lb.instances) {
        return
    }
    
    // Instance'ı listeden çıkar
    lb.instances = append(lb.instances[:index], lb.instances[index+1:]...)
    
    // Current index'i güncelle
    if lb.current >= len(lb.instances) {
        lb.current = 0
    }
}

31.2 Failover Mechanism

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
type FailoverManager struct {
    primary   *VideoProcessor
    secondary *VideoProcessor
    active    *VideoProcessor
    mutex     sync.RWMutex
}

func (fm *FailoverManager) ProcessFrame(frame *Frame) error {
    fm.mutex.RLock()
    active := fm.active
    fm.mutex.RUnlock()
    
    err := active.Process(frame)
    if err != nil {
        // Failover
        fm.switchToSecondary()
        return fm.active.Process(frame)
    }
    
    return nil
}

func (fm *FailoverManager) switchToSecondary() {
    fm.mutex.Lock()
    defer fm.mutex.Unlock()
    
    if fm.active == fm.primary {
        fm.active = fm.secondary
    } else {
        fm.active = fm.primary
    }
}

32. Sonuç

Bu makalede, production ortamında kullanılan bir gerçek zamanlı video analiz sisteminin temel bileşenlerini ve mimarisini paylaştık. Go ile edge processing yaklaşımı, yüksek performanslı, ölçeklenebilir ve maliyet etkin sistemler geliştirmeyi mükemmel bir şekilde mümkün kılar. Go’nun goroutine modeli, video işleme pipeline’ları için ideal bir yapı sunar ve channel’lar üzerinden frame akışını kolayca yönetmemizi sağlar.

32.1 Proje Deneyimlerimiz

Bu sistemi geliştirirken ve production’da kullanırken edindiğimiz temel deneyimler:

  1. Modüler mimari kritik - Her bileşenin bağımsız çalışabilmesi, sistemin bakımını ve geliştirmesini kolaylaştırıyor
  2. Error handling ve resilience - Production’da her şey yolunda gitmeyebilir, bu yüzden graceful degradation ve retry mekanizmaları şart
  3. Monitoring olmadan kör uçuyorsunuz - Sistemin sağlığını ve performansını sürekli izlemek gerekiyor
  4. Edge processing gerçekten maliyet tasarrufu sağlıyor - Cloud processing’e göre %95+ tasarruf mümkün
  5. Go’nun eşzamanlılık modeli video işleme için ideal - Goroutine ve channel yapısı, pipeline’ları yönetmeyi çok kolaylaştırıyor
  6. C’den Go’ya geçiş deneyimi - C ile yazılmış sistemin Go ile yeniden yazılması, hem performans hem de geliştirici deneyimi açısından önemli iyileştirmeler sağladı

32.2 Özet

  • Edge processing yaklaşımı, gecikmeyi azaltır ve bant genişliği kullanımını optimize eder
  • Go’nun eşzamanlılık modeli, video pipeline’larını verimli bir şekilde yönetmemizi sağlar
  • Modüler mimari, farklı analiz tekniklerini kolayca ekleyip çıkarabilmemizi sağlar
  • Graceful degradation ve error handling, production ortamlarında güvenilir sistemler kurmamıza olanak tanır
  • ROI detection ve network optimizasyonu gibi teknikler, sistem performansını önemli ölçüde artırır
  • Machine learning model deployment stratejileri, sistemin sürekli gelişmesini sağlar

32.3 Sonraki Adımlar

  1. GPU desteği - NVIDIA Jetson, Intel Neural Compute Stick gibi edge AI cihazlarıyla entegrasyon (Bölüm 29.4’te başlangıç yapıldı)
  2. Distributed processing - Birden fazla kamera için distributed edge processing
  3. Real-time streaming - Analiz edilmiş frame’leri gerçek zamanlı olarak stream etme
  4. Advanced analytics - Yüz tanıma, davranış analizi, tracking algoritmaları
  5. Edge-to-cloud sync - Analiz sonuçlarının cloud’da birleştirilmesi ve raporlama
  6. WebRTC entegrasyonu - Browser tabanlı real-time preview ve kontrol
  7. Advanced compression - AV1 codec desteği ve adaptive compression

32.4 İleri Seviye Konular (Ayrı Makale Konuları)

Bu makale temel ve orta seviye konuları kapsamaktadır. Aşağıdaki konular için ayrı makaleler önerilir:

Dağıtık Edge Processing:

  • Leader election ve consensus algoritmaları
  • Edge-cloud senkronizasyonu (eventual consistency)
  • Multi-edge device koordinasyonu

CI/CD ve GitOps:

  • Multi-architecture Docker image’ları (arm/v7, arm64, amd64)
  • ArgoCD, Flux ile edge deployment
  • Automated testing ve deployment pipelines

Monitoring & Observability:

  • Distributed tracing (Jaeger, OpenTelemetry)
  • Anomali tespiti için metrik analiz
  • Advanced alerting stratejileri

Güvenlik Derinlemesine:

  • SRTP, DTLS video stream şifreleme
  • ONVIF, digest auth kamera kimlik doğrulama
  • Container güvenliği (AppArmor, Seccomp)

Yasal ve Etik:

  • GDPR, KVKK uyumluluğu
  • Video kayıt saklama ve imha politikaları
  • Privacy-by-design yaklaşımları

Performance Optimization:

  • Model quantization, pruning, distillation
  • Hardware codec entegrasyonu (VA-API, VDPAU, NVDEC)
  • Memory management optimizasyonları (HugeTLB, alignment)

Network Protocols:

  • SRT (Secure Reliable Transport)
  • QUIC protokolü üzerinden streaming
  • Time-Sensitive Networking (TSN) endüstriyel ortamlar için

32.4 Kod Örnekleri ve Repository

Bu makalede paylaşılan kod örnekleri, production ortamında kullanılan bir sistemin temel bileşenlerinden oluşmaktadır. Tam implementasyon ve ek özellikler için:

Not: Kod örnekleri ve tam implementasyon, proje gereksinimlerine göre özelleştirilmiştir. Bu makaledeki örnekler, sistemin temel mimarisini ve yaklaşımını göstermektedir.

32.6 Öğrenme Kaynakları

32.7 Sonuç ve Öneriler

Bu sistem, production ortamında başarıyla çalışmaktadır ve aşağıdaki avantajları sağlamıştır:

  1. %95+ maliyet tasarrufu - Cloud processing yerine edge processing
  2. <100ms latency - Gerçek zamanlı karar verme
  3. Ölçeklenebilir mimari - Her kamera bağımsız çalışır
  4. Güvenilir sistem - Graceful degradation ve error recovery

Başlangıç için öneriler:

  • Önce tek kamera ile başlayın
  • Motion detection ile test edin
  • Object detection’ı sonra ekleyin
  • Production’a geçmeden önce load test yapın
  • Monitoring ve alerting’i mutlaka kurun

Not: Bu makale teknik bir rehber olarak hazırlanmıştır. Üretim ortamında kullanmadan önce kapsamlı test yapılması önerilir.