Go ile Real-Time Video Analizi ve Edge Processing
Özet
- Edge Processing: Veriyi merkeze göndermeden lokal olarak işleme
- Go + Video Pipeline: Goroutine ve channel yapısı ile yüksek performanslı video işleme
- Production Ready: Motion detection, object detection, event publishing, monitoring
- Maliyet Tasarrufu: Cloud processing’e göre %95+ tasarruf
Not: Bu makale, production ortamında kullanılan bir video analiz sisteminin temel bileşenlerini paylaşmaktadır. Kod örnekleri ve mimari kararlar, gerçek proje deneyimlerinden yola çıkılarak hazırlanmıştır.
1. Giriş: Video Verisi ve Edge Processing İhtiyacı
Günümüzde video, üretilen verinin en büyük bölümünü oluşturuyor. CCTV sistemleri, canlı yayın altyapıları, akıllı şehirler, endüstriyel kameralar ve IoT cihazları; gerçek zamanlı (real-time) video analizi ihtiyacını kaçınılmaz hale getiriyor.
Bu noktada iki temel problem ortaya çıkıyor:
- Gecikme (latency) - Veriyi merkeze göndermek zaman alır
- Bant genişliği ve maliyet - Ham video akışları oldukça pahalıdır
Bu problemleri çözmenin en etkili yollarından biri Edge Processing yaklaşımıdır.
Proje Hikayesi
Bu makaleyi yazma gereği, daha önce C dili ile geliştirdiğim bir video analiz sistemini Go (Golang) ile yeniden yazma deneyimimden doğdu. C ile yazılmış sistem, performans açısından mükemmel sonuçlar veriyordu ve production ortamında başarıyla çalışıyordu. Ancak, sistemin bakımı ve geliştirilmesi süreçlerinde, Go’nun sağladığı bazı avantajların bu tür bir proje için ne kadar değerli olduğunu fark ettim.
Go ile yeniden yazma sürecinde, C’de elde ettiğim performansa çok yakın, pratikte kabul edilebilir bir performans seviyesi yakaladım. Bunun yanında, goroutine ve channel yapısının video pipeline’ları için ne kadar uygun olduğunu keşfettim. Go’nun eşzamanlılık modeli, C’deki thread yönetimi ve memory management yaklaşımlarına göre daha temiz ve güvenli bir geliştirici deneyimi sunuyor. Ayrıca Go’nun cross-platform derleme yeteneği, sistemin farklı edge cihazlarında (Raspberry Pi, x86, ARM) çalışmasını çok kolaylaştırdı. GC ve runtime overhead nedeniyle teorik olarak küçük bir fark olsa da, geliştirme hızı ve bakım kolaylığı bu farkı fazlasıyla telafi etti.
Bu deneyim sonucunda, Go ile edge video processing sisteminin hem performans hem de geliştirici deneyimi açısından güçlü bir alternatif olduğunu gördüm. C’nin performans avantajları tartışılmaz, ancak Go’nun sunduğu modern dil özellikleri ve geliştirici dostu yaklaşımı, bu tür sistemler için önemli bir tercih sebebi olabilir. Bu makale, bu deneyimlerden yola çıkarak, benzer sistemler geliştirmek isteyen geliştiricilere rehberlik etmek amacıyla hazırlanmıştır.
2. Edge Processing Nedir?
Edge processing, verinin merkezî bir sunucuya gönderilmeden, üretildiği noktaya (kamera, gateway, edge node) yakın bir yerde işlenmesidir.
2.1 Avantajları
- Düşük gecikme - Veri lokal olarak işlendiği için milisaniyeler içinde sonuç alınır
- Daha az bant genişliği kullanımı - Sadece analiz sonuçları (event’ler) gönderilir, ham video gönderilmez
- Daha iyi veri gizliliği - Hassas görüntüler merkeze gönderilmez
- Ölçeklenebilir mimari - Her edge node bağımsız çalışır, merkezi sistemde tek nokta başarısızlığı olmaz
- Maliyet etkinlik - Cloud compute maliyetleri azalır
2.2 Kullanım Senaryoları
- Akıllı güvenlik sistemleri - Anomali tespiti, yüz tanıma
- Endüstriyel kalite kontrolü - Üretim hattında hızlı karar verme
- Traffic yönetimi - Araç sayımı, trafik analizi
- Retail analytics - Müşteri davranış analizi, heat map oluşturma
3. Neden Go (Golang)?
Go, edge ve real-time sistemler için oldukça güçlü bir adaydır:
- Yüksek performans - Native binary, düşük GC overhead, C/C++ seviyesinde hız
- Eşzamanlılık - Goroutine & channel yapısı ile eşzamanlı video pipeline’ları kolayca yönetilir
- Düşük bellek kullanımı - Edge cihazların kısıtlı kaynakları için ideal
- Cross-platform derleme - Linux ARM (Raspberry Pi), x86, Windows için tek kaynak kodu
- Kolay containerization - Docker, k3s, Kubernetes Edge (K3s) ile uyumlu
- Statik binary - Tüm bağımlılıklar binary’de, dağıtım kolaylığı
- Zengin standart kütüphane -
net, context, sync paketleri video işleme için mükemmel
4. Genel Mimari
5. Hızlı Başlangıç Rehberi
Sistemi kurmak ve çalıştırmak için adım adım rehber:
5.1 Gereksinimler
1
2
3
4
5
6
7
8
9
|
# Go 1.21+ kurulu olmalı
go version
# FFmpeg kurulu olmalı
ffmpeg -version
# (Opsiyonel) OpenCV ve GoCV için
# Linux: apt-get install libopencv-dev
# macOS: brew install opencv
|
5.2 Proje Yapısı
1
2
3
4
5
6
7
8
9
10
11
12
|
video-processor/
├── cmd/
│ └── main.go
├── internal/
│ ├── pipeline/
│ ├── detector/
│ ├── source/
│ └── publisher/
├── config/
│ └── config.yaml
├── go.mod
└── Dockerfile
|
5.3 Temel Kurulum
1
2
3
4
5
6
7
8
9
10
11
|
# Projeyi oluştur
mkdir video-processor && cd video-processor
go mod init video-processor
# Bağımlılıkları ekle
go get github.com/eclipse/paho.mqtt.golang
go get gopkg.in/yaml.v3
go get github.com/sirupsen/logrus
# (Opsiyonel) GoCV
go get gocv.io/x/gocv
|
GoCV kurulumu platforma göre farklılık gösterir:
Linux (Ubuntu/Debian)
1
2
3
4
5
6
7
8
9
10
11
12
|
# OpenCV bağımlılıklarını kur
sudo apt-get update
sudo apt-get install -y \
libopencv-dev \
pkg-config \
libavcodec-dev \
libavformat-dev \
libavutil-dev \
libswscale-dev
# GoCV'yi kur
go get gocv.io/x/gocv
|
macOS
1
2
3
4
5
|
# Homebrew ile OpenCV kur
brew install opencv pkg-config
# GoCV'yi kur
go get gocv.io/x/gocv
|
Windows
1
2
3
4
5
6
7
8
9
|
# Chocolatey ile OpenCV kur (veya manuel)
choco install opencv
# Environment variable ayarla
set CGO_CPPFLAGS=-IC:\opencv\build\include
set CGO_LDFLAGS=-LC:\opencv\build\x64\vc15\lib
# GoCV'yi kur
go get gocv.io/x/gocv
|
Docker ile Taşınabilirlik
OpenCV bağımlılıklarını Docker ile yönetmek en kolay yöntemdir:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
# Build stage
FROM golang:1.23 AS builder
# OpenCV bağımlılıklarını kur
RUN apt-get update && apt-get install -y \
libopencv-dev \
pkg-config \
&& rm -rf /var/lib/apt/lists/*
WORKDIR /app
COPY go.mod go.sum ./
RUN go mod download
COPY . .
RUN CGO_ENABLED=1 go build -o video-processor ./cmd/main.go
|
Karşılaşılan Sorunlar ve Çözümleri:
- CGO hatası:
CGO_ENABLED=1 olmadan derleme yapılamaz
- pkg-config bulunamadı:
pkg-config paketinin kurulu olduğundan emin olun
- OpenCV versiyonu uyumsuzluğu: GoCV belirli OpenCV versiyonlarıyla çalışır, versiyon kontrolü yapın
- Cross-compilation zorluğu: ARM için native build yapın veya Docker kullanın
Önemli Cross-Compilation Notu (ARM / Edge Cihazlar):
- GoCV, CGO kullandığı için x86 bir makineden doğrudan
GOOS=linux GOARCH=arm64 go build ... komutuyla Raspberry Pi / Jetson gibi ARM cihazlara cross-compile etmek çoğu zaman sorunsuz çalışmaz.
- Derleme sırasında hem Go tarafında hem de native OpenCV kütüphanelerinde doğru mimariyi hedeflemek gerekir; bu da pratikte oldukça karmaşıktır.
- Önerilen yaklaşımlar:
- Mümkünse ARM cihazın üzerinde native build yapın (örneğin direkt Raspberry Pi üzerinde
go build).
- Veya Docker buildx + QEMU kullanarak multi-arch Docker image üretin (amd64 host’tan arm64 image build etmek için).
- Production senaryolarında, her mimari için ayrı build pipeline tanımlamak (amd64, armv7, arm64) çoğu zaman en sağlıklı çözümdür.
5.4 İlk Test
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
// cmd/main.go
package main
import (
"context"
"log"
"time"
)
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Test video source (dosyadan)
source := NewVideoSource("file://test.mp4", 640, 480)
if err := source.Start(ctx); err != nil {
log.Fatal(err)
}
// İlk 10 frame'i oku
frameCount := 0
for frame := range source.Frames() {
log.Printf("Frame %d alındı: %dx%d", frameCount, frame.Width, frame.Height)
frameCount++
if frameCount >= 10 {
break
}
}
source.Stop()
log.Println("Test tamamlandı!")
}
|
5.5 Production’a Geçiş Checklist
Production’a geçmeden önce aşağıdaki checklist’i kontrol edin:
- Error handling ve retry mekanizmaları eklendi
- Logging ve monitoring kuruldu
- Health check endpoint’leri çalışıyor
- Configuration dosyaları hazır
- Docker image build edildi ve test edildi
- Load test yapıldı
- Memory leak test edildi
- Graceful shutdown çalışıyor
- Alerting mekanizması kuruldu
- Backup ve recovery planı hazır
Edge processing için doğru codec ve format seçimi kritik öneme sahiptir. Farklı codec’lerin avantaj ve dezavantajları:
6.1 Codec Karşılaştırması
| Codec |
Compression Ratio |
CPU Usage |
Edge Uygunluğu |
Önerilen Kullanım |
| H.264 |
1.0x (baseline) |
Düşük |
⭐⭐⭐⭐⭐ |
Genel kullanım, en yaygın |
| H.265/HEVC |
1.5-2.0x |
Orta-Yüksek |
⭐⭐⭐⭐ |
Yüksek kalite, sınırlı bant genişliği |
| AV1 |
2.0-2.5x |
Çok Yüksek |
⭐⭐ |
Gelecek için, yüksek performanslı cihazlar |
| MJPEG |
0.3-0.5x |
Çok Düşük |
⭐⭐⭐ |
Düşük latency, frame-by-frame işleme |
Edge processing için önerilen format: RGB24 veya YUV420
RGB24:
- ✅ Her pixel için 3 byte (R, G, B)
- ✅ Doğrudan görüntü işleme için uygun
- ✅ OpenCV/GoCV ile kolay entegrasyon
- ❌ Daha büyük bellek kullanımı
YUV420:
- ✅ Daha küçük bellek kullanımı (%50 daha az)
- ✅ Video codec’ler için native format
- ❌ RGB’ye dönüşüm gerekebilir
1
2
3
4
5
6
|
// FFmpeg'de format seçimi
// RGB24 için:
args := []string{"-pix_fmt", "rgb24"}
// YUV420 için (daha küçük):
args := []string{"-pix_fmt", "yuv420p"}
|
7. Video Kaynağından Frame Almak
Video kaynağından frame almak için FFmpeg’i Go uygulamamız içinde process olarak çalıştıracağız. FFmpeg, RTSP, USB kamera, dosya ve daha birçok kaynaktan video okuyabilir.
7.1 Farklı Video Kaynakları
Production ortamında farklı video kaynak türleriyle karşılaşabilirsiniz:
RTSP Stream (IP Kamera)
En yaygın kullanım senaryosu. IP kameralar genellikle RTSP protokolü üzerinden video sağlar.
USB Kamera
USB kameralar için FFmpeg’in video4linux2 (v4l2) desteğini kullanabiliriz:
1
|
ffmpeg -f v4l2 -i /dev/video0 -f rawvideo -pix_fmt rgb24 -s 640x480 pipe:1
|
Video Dosyası
Test ve geliştirme için video dosyalarından okuma:
1
|
ffmpeg -i test_video.mp4 -f rawvideo -pix_fmt rgb24 -s 640x480 pipe:1
|
HTTP/HTTPS Stream
Bazı modern kameralar HTTP üzerinden MJPEG veya HLS stream sağlar.
7.2 FFmpeg Komutu
1
2
|
ffmpeg -rtsp_transport tcp -i rtsp://username:password@camera-ip:554/stream \
-f rawvideo -pix_fmt rgb24 -s 640x480 pipe:1
|
Bu komut:
-rtsp_transport tcp: RTSP bağlantısı için TCP kullanır (güvenilir)
-f rawvideo: Ham video formatı çıktısı
-pix_fmt rgb24: RGB24 pixel formatı (her pixel 3 byte)
-s 640x480: Çözünürlük ayarı (gereksinime göre değiştirilebilir)
pipe:1: Stdout’a yazar (Go’da okuyacağız)
7.3 Go ile FFmpeg Entegrasyonu
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
|
package main
import (
"bufio"
"context"
"fmt"
"io"
"log"
"os/exec"
"sync"
"time"
)
type Frame struct {
Data []byte
Width int
Height int
Timestamp time.Time
}
type VideoSource struct {
sourceURL string
width int
height int
cmd *exec.Cmd
stdout io.ReadCloser
frames chan *Frame
errors chan error
wg sync.WaitGroup
}
func NewVideoSource(sourceURL string, width, height int) *VideoSource {
return &VideoSource{
sourceURL: sourceURL,
width: width,
height: height,
frames: make(chan *Frame, 10), // Buffer 10 frames
errors: make(chan error, 1),
}
}
func (vs *VideoSource) Start(ctx context.Context) error {
// FFmpeg komutunu oluştur
args := []string{
"-rtsp_transport", "tcp",
"-i", vs.sourceURL,
"-f", "rawvideo",
"-pix_fmt", "rgb24",
"-s", fmt.Sprintf("%dx%d", vs.width, vs.height),
"pipe:1",
}
vs.cmd = exec.CommandContext(ctx, "ffmpeg", args...)
var err error
vs.stdout, err = vs.cmd.StdoutPipe()
if err != nil {
return fmt.Errorf("stdout pipe: %w", err)
}
stderr, err := vs.cmd.StderrPipe()
if err != nil {
return fmt.Errorf("stderr pipe: %w", err)
}
if err := vs.cmd.Start(); err != nil {
return fmt.Errorf("start ffmpeg: %w", err)
}
// FFmpeg stderr loglarını ayrı goroutine'de oku
go func() {
scanner := bufio.NewScanner(stderr)
for scanner.Scan() {
log.Printf("FFmpeg: %s", scanner.Text())
}
}()
// Frame okuma goroutine'i
vs.wg.Add(1)
go vs.readFrames(ctx)
return nil
}
func (vs *VideoSource) readFrames(ctx context.Context) {
defer vs.wg.Done()
frameSize := vs.width * vs.height * 3 // RGB24: her pixel 3 byte
frameBuffer := make([]byte, frameSize)
for {
select {
case <-ctx.Done():
// Context iptal olduğunda FFmpeg process'ini de sonlandır
if vs.cmd != nil && vs.cmd.Process != nil {
_ = vs.cmd.Process.Kill()
}
return
default:
// Bir frame'in tamamını oku
n, err := io.ReadFull(vs.stdout, frameBuffer)
if err != nil {
if err != io.EOF && err != io.ErrUnexpectedEOF {
select {
case vs.errors <- fmt.Errorf("read frame: %w", err):
case <-ctx.Done():
}
}
return
}
if n != frameSize {
continue // Eksik frame, atla
}
// Yeni frame buffer'ı kopyala (goroutine safe)
frameData := make([]byte, frameSize)
copy(frameData, frameBuffer)
frame := &Frame{
Data: frameData,
Width: vs.width,
Height: vs.height,
Timestamp: time.Now(),
}
select {
case vs.frames <- frame:
case <-ctx.Done():
return
}
}
}
}
func (vs *VideoSource) Frames() <-chan *Frame {
return vs.frames
}
func (vs *VideoSource) Errors() <-chan error {
return vs.errors
}
func (vs *VideoSource) Stop() error {
if vs.cmd != nil && vs.cmd.Process != nil {
vs.cmd.Process.Kill()
}
vs.wg.Wait()
close(vs.frames)
close(vs.errors)
return nil
}
|
8. Go ile Video Pipeline
Go’nun goroutine ve channel yapısı, video işleme pipeline’ları için mükemmeldir. Her aşama bir goroutine olarak çalışır ve channel’lar üzerinden frame’ler geçirilir.
8.1 Pipeline Mimarisi
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
|
package main
import (
"context"
"fmt"
"sync"
"time"
)
type Pipeline struct {
source *VideoSource
processors []FrameProcessor
wg sync.WaitGroup
}
type FrameProcessor interface {
Process(ctx context.Context, in <-chan *Frame, out chan<- *Frame)
Name() string
}
func NewPipeline(source *VideoSource) *Pipeline {
return &Pipeline{
source: source,
processors: make([]FrameProcessor, 0),
}
}
func (p *Pipeline) AddProcessor(processor FrameProcessor) {
p.processors = append(p.processors, processor)
}
func (p *Pipeline) Run(ctx context.Context) error {
// Video kaynağını başlat
if err := p.source.Start(ctx); err != nil {
return err
}
// Channel'ları oluştur
channels := make([]chan *Frame, len(p.processors)+1)
for i := range channels {
channels[i] = make(chan *Frame, 10)
}
// Source'dan ilk channel'a kopyala
p.wg.Add(1)
go func() {
defer p.wg.Done()
defer close(channels[0])
for frame := range p.source.Frames() {
select {
case channels[0] <- frame:
case <-ctx.Done():
return
}
}
}()
// Her processor'ı çalıştır
for i, proc := range p.processors {
p.wg.Add(1)
go func(idx int, processor FrameProcessor) {
defer p.wg.Done()
defer func() {
if r := recover(); r != nil {
fmt.Printf("Processor panic: %v\n", r)
}
close(channels[idx+1])
}()
processor.Process(ctx, channels[idx], channels[idx+1])
}(i, proc)
}
// Hata kontrolü
go func() {
for err := range p.source.Errors() {
// Log error (örnek)
fmt.Printf("Video source error: %v\n", err)
}
}()
return nil
}
func (p *Pipeline) Stop() error {
return p.source.Stop()
}
|
8.2 Frame Struct İyileştirmesi
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
type Frame struct {
Data []byte
Width int
Height int
Timestamp time.Time
Metadata map[string]interface{} // Analiz sonuçları için
}
func NewFrame(data []byte, width, height int) *Frame {
return &Frame{
Data: data,
Width: width,
Height: height,
Timestamp: time.Now(),
Metadata: make(map[string]interface{}),
}
}
|
9. Gerçek Zamanlı Video Analizi
9.0 ROI (Region of Interest) Detection
Tüm frame’i işlemek yerine, sadece ilgili bölgeleri (ROI) işleyerek CPU kullanımını önemli ölçüde azaltabiliriz:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
|
import (
"image"
"sync"
)
type ROIDetector struct {
regions []image.Rectangle
mutex sync.RWMutex
}
func NewROIDetector() *ROIDetector {
return &ROIDetector{
regions: make([]image.Rectangle, 0),
}
}
// Motion detection sonuçlarına göre ROI belirle
func (rd *ROIDetector) UpdateROI(motionMap []bool, width, height int) {
rd.mutex.Lock()
defer rd.mutex.Unlock()
// Motion olan bölgeleri tespit et ve bounding box oluştur
// Basit implementasyon: motion olan pixel'lerin bounding box'ı
minX, minY := width, height
maxX, maxY := 0, 0
for y := 0; y < height; y++ {
for x := 0; x < width; x++ {
idx := y*width + x
if idx < len(motionMap) && motionMap[idx] {
if x < minX {
minX = x
}
if x > maxX {
maxX = x
}
if y < minY {
minY = y
}
if y > maxY {
maxY = y
}
}
}
}
if maxX > minX && maxY > minY {
// Padding ekle
padding := 20
roi := image.Rect(
maxInt(0, minX-padding),
maxInt(0, minY-padding),
minInt(width, maxX+padding),
minInt(height, maxY+padding),
)
rd.regions = []image.Rectangle{roi}
}
}
func maxInt(a, b int) int {
if a > b {
return a
}
return b
}
func minInt(a, b int) int {
if a < b {
return a
}
return b
}
// Frame'in sadece ROI bölgesini çıkar
func (rd *ROIDetector) ExtractROI(frame *Frame) []*Frame {
rd.mutex.RLock()
defer rd.mutex.RUnlock()
roiFrames := make([]*Frame, 0, len(rd.regions))
for _, roi := range rd.regions {
roiData := extractRegion(frame.Data, frame.Width, frame.Height, roi)
roiFrame := &Frame{
Data: roiData,
Width: roi.Dx(),
Height: roi.Dy(),
Timestamp: frame.Timestamp,
Metadata: map[string]interface{}{
"roi": roi,
"original_width": frame.Width,
"original_height": frame.Height,
},
}
roiFrames = append(roiFrames, roiFrame)
}
return roiFrames
}
func extractRegion(data []byte, width, height int, roi image.Rectangle) []byte {
roiWidth := roi.Dx()
roiHeight := roi.Dy()
// ROI'nin frame sınırları içinde olduğundan emin ol (bounds check)
if roi.Min.X < 0 || roi.Min.Y < 0 || roi.Max.X > width || roi.Max.Y > height {
return nil
}
roiData := make([]byte, roiWidth*roiHeight*3)
for y := 0; y < roiHeight; y++ {
srcY := roi.Min.Y + y
srcOffset := (srcY*width + roi.Min.X) * 3
dstOffset := (y * roiWidth) * 3
// Her satırı tek seferde kopyala (daha hızlı, daha az bounds check)
copy(roiData[dstOffset:dstOffset+roiWidth*3],
data[srcOffset:srcOffset+roiWidth*3])
}
return roiData
}
|
ROI Kullanımının Avantajları:
- %60-80 CPU tasarrufu - Sadece ilgili bölgeleri işleme
- Daha hızlı object detection - Küçük ROI’ler üzerinde inference
- Daha az bellek kullanımı - Sadece ROI frame’leri saklama
9.1 Motion Detection
Motion detection, frame’ler arasındaki farkları hesaplayarak hareket tespiti yapar. Edge sistemlerde hafif ve hızlı bir yöntemdir.
Frame Differencing Algoritması
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
|
package main
import (
"context"
"sync"
)
type MotionDetector struct {
threshold float64
previousFrame []byte
width, height int
mutex sync.RWMutex // previousFrame için thread-safety
}
func NewMotionDetector(threshold float64, width, height int) *MotionDetector {
return &MotionDetector{
threshold: threshold,
width: width,
height: height,
}
}
func (md *MotionDetector) Process(ctx context.Context, in <-chan *Frame, out chan<- *Frame) {
for frame := range in {
select {
case <-ctx.Done():
return
default:
hasMotion := md.detectMotion(frame)
frame.Metadata["motion_detected"] = hasMotion
if hasMotion {
frame.Metadata["motion_score"] = md.calculateMotionScore(frame)
}
select {
case out <- frame:
case <-ctx.Done():
return
}
}
}
}
func (md *MotionDetector) Name() string {
return "MotionDetector"
}
func (md *MotionDetector) detectMotion(frame *Frame) bool {
md.mutex.Lock()
defer md.mutex.Unlock()
if md.previousFrame == nil {
// İlk frame, sakla ve hareket yok
md.previousFrame = make([]byte, len(frame.Data))
copy(md.previousFrame, frame.Data)
return false
}
diff := md.calculateDifference(frame.Data, md.previousFrame)
motionRatio := float64(diff) / float64(len(frame.Data))
// Önceki frame buffer'ını yeniden kullan (gerekiyorsa yeniden allocate et)
if md.previousFrame == nil || len(md.previousFrame) != len(frame.Data) {
md.previousFrame = make([]byte, len(frame.Data))
}
copy(md.previousFrame, frame.Data)
return motionRatio > md.threshold
}
func (md *MotionDetector) calculateDifference(current, previous []byte) int {
diff := 0
for i := 0; i < len(current) && i < len(previous); i++ {
if current[i] > previous[i] {
diff += int(current[i] - previous[i])
} else {
diff += int(previous[i] - current[i])
}
}
return diff
}
func (md *MotionDetector) calculateMotionScore(frame *Frame) float64 {
md.mutex.RLock()
defer md.mutex.RUnlock()
if md.previousFrame == nil {
return 0.0
}
diff := md.calculateDifference(frame.Data, md.previousFrame)
maxDiff := len(frame.Data) * 255
return float64(diff) / float64(maxDiff)
}
|
9.2 Object Detection
Object detection için GoCV (OpenCV binding) veya external inference servisleri kullanılabilir.
GoCV ile Basit Object Detection
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
|
package main
import (
"context"
"fmt"
"image"
"os"
"strings"
"gocv.io/x/gocv"
)
type ObjectDetector struct {
net gocv.Net
classes []string
}
func NewObjectDetector(modelPath, configPath, classesPath string) (*ObjectDetector, error) {
net := gocv.ReadNet(modelPath, configPath)
if net.Empty() {
return nil, fmt.Errorf("failed to load model")
}
// GPU varsa kullan (opsiyonel)
net.SetPreferableBackend(gocv.NetBackendOpenVINO)
net.SetPreferableTarget(gocv.NetTargetCPU) // veya NetTargetVPU
// Classes dosyasını oku
classes, err := loadClasses(classesPath)
if err != nil {
return nil, fmt.Errorf("failed to load classes: %w", err)
}
return &ObjectDetector{
net: net,
classes: classes,
}, nil
}
func (od *ObjectDetector) Process(ctx context.Context, in <-chan *Frame, out chan<- *Frame) {
for frame := range in {
select {
case <-ctx.Done():
return
default:
detections := od.detect(frame)
frame.Metadata["detections"] = detections
select {
case out <- frame:
case <-ctx.Done():
return
}
}
}
}
func (od *ObjectDetector) Name() string {
return "ObjectDetector"
}
func (od *ObjectDetector) detect(frame *Frame) []Detection {
// Frame data'yı gocv.Mat'e dönüştür
img, err := gocv.NewMatFromBytes(
frame.Height,
frame.Width,
gocv.MatTypeCV8UC3,
frame.Data,
)
if err != nil {
return nil
}
defer img.Close()
// Blob oluştur (model input formatı)
blob := gocv.BlobFromImage(img, 1.0/255.0, image.Pt(416, 416), gocv.NewScalar(0, 0, 0, 0), true, false)
defer blob.Close()
// Inference
od.net.SetInput(blob, "")
prob := od.net.Forward("")
defer prob.Close()
// Sonuçları parse et
detections := parseDetections(prob, frame.Width, frame.Height)
return detections
}
type Detection struct {
Class string
Confidence float64
BBox image.Rectangle
}
func parseDetections(prob gocv.Mat, imgWidth, imgHeight int) []Detection {
// Model output'una göre parse et
// Bu örnek basitleştirilmiş, gerçek implementasyon model'e bağlı
return []Detection{}
}
func loadClasses(path string) ([]string, error) {
data, err := os.ReadFile(path)
if err != nil {
return nil, fmt.Errorf("failed to read classes file: %w", err)
}
lines := strings.Split(string(data), "\n")
classes := make([]string, 0, len(lines))
for _, line := range lines {
line = strings.TrimSpace(line)
if line != "" {
classes = append(classes, line)
}
}
return classes, nil
}
|
External Inference Service (HTTP/REST)
Model çok büyükse veya özel hardware (NVIDIA Jetson, Intel Neural Compute Stick) varsa, inference’ı ayrı bir servise taşıyabiliriz:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
|
package main
import (
"bytes"
"context"
"encoding/base64"
"encoding/json"
"fmt"
"net/http"
"time"
)
type ExternalDetector struct {
client *http.Client
apiURL string
timeout time.Duration
}
func (ed *ExternalDetector) Process(ctx context.Context, in <-chan *Frame, out chan<- *Frame) {
for frame := range in {
select {
case <-ctx.Done():
return
default:
detections, err := ed.callInferenceService(frame)
if err == nil {
frame.Metadata["detections"] = detections
} else {
frame.Metadata["detection_error"] = err.Error()
}
select {
case out <- frame:
case <-ctx.Done():
return
}
}
}
}
func (ed *ExternalDetector) callInferenceService(frame *Frame) ([]Detection, error) {
// Frame'i base64 encode et veya binary gönder
reqBody, err := json.Marshal(map[string]interface{}{
"image": base64.StdEncoding.EncodeToString(frame.Data),
"width": frame.Width,
"height": frame.Height,
})
if err != nil {
return nil, err
}
ctx, cancel := context.WithTimeout(context.Background(), ed.timeout)
defer cancel()
req, err := http.NewRequestWithContext(ctx, "POST", ed.apiURL, bytes.NewReader(reqBody))
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "application/json")
resp, err := ed.client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
var result struct {
Detections []Detection `json:"detections"`
}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return nil, err
}
return result.Detections, nil
}
|
10. Edge Decision Layer
Edge Decision Layer, analiz sonuçlarına göre karar verir ve sadece önemli event’leri cloud’a gönderir. Bu katman:
- Threshold filtering - Düşük confidence’ları filtreler
- Event deduplication - Aynı event’i tekrar tekrar göndermez
- Rate limiting - Çok fazla event göndermeyi engeller
- Event aggregation - Birden fazla event’i birleştirir
10.1 Event Publisher Implementasyonları
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
|
package main
import (
"context"
"fmt"
"sync"
"time"
)
type EdgeDecisionLayer struct {
minConfidence float64
eventPublisher EventPublisher
lastEvents map[string]time.Time
mutex sync.RWMutex
cooldown time.Duration
}
type EventPublisher interface {
PublishWithContext(ctx context.Context, event *Event) error
}
type Event struct {
Type string `json:"type"`
Timestamp time.Time `json:"timestamp"`
Confidence float64 `json:"confidence"`
Detections []Detection `json:"detections,omitempty"`
Metadata map[string]interface{} `json:"metadata"`
}
func NewEdgeDecisionLayer(minConfidence float64, publisher EventPublisher, cooldown time.Duration) *EdgeDecisionLayer {
return &EdgeDecisionLayer{
minConfidence: minConfidence,
eventPublisher: publisher,
lastEvents: make(map[string]time.Time),
cooldown: cooldown,
}
}
func (edl *EdgeDecisionLayer) Process(ctx context.Context, in <-chan *Frame, out chan<- *Frame) {
for frame := range in {
select {
case <-ctx.Done():
return
default:
edl.processFrame(frame)
// Frame'i pipeline'a devam ettir
select {
case out <- frame:
case <-ctx.Done():
return
}
}
}
}
func (edl *EdgeDecisionLayer) Name() string {
return "EdgeDecisionLayer"
}
func (edl *EdgeDecisionLayer) processFrame(frame *Frame) {
// Detections varsa kontrol et
detections, ok := frame.Metadata["detections"].([]Detection)
if !ok {
return
}
// Yüksek confidence'lı detections'ları filtrele
significantDetections := edl.filterByConfidence(detections)
if len(significantDetections) == 0 {
return
}
// Event tipini belirle
eventType := edl.determineEventType(significantDetections)
// Cooldown kontrolü (aynı event'i çok sık gönderme)
if !edl.shouldSendEvent(eventType) {
return
}
// Event oluştur
event := &Event{
Type: eventType,
Timestamp: frame.Timestamp,
Confidence: edl.calculateMaxConfidence(significantDetections),
Detections: significantDetections,
Metadata: map[string]interface{}{
"frame_width": frame.Width,
"frame_height": frame.Height,
},
}
// Pipeline'ı bloklamamak için event'i async ve timeout'lu publish et
go func(evt *Event, evtType string) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := edl.eventPublisher.PublishWithContext(ctx, evt); err != nil {
fmt.Printf("Failed to publish event: %v\n", err)
return
}
// Başarılı publish sonrası cooldown güncelle
edl.mutex.Lock()
edl.lastEvents[evtType] = time.Now()
edl.mutex.Unlock()
}(event, eventType)
}
func (edl *EdgeDecisionLayer) filterByConfidence(detections []Detection) []Detection {
filtered := make([]Detection, 0)
for _, det := range detections {
if det.Confidence >= edl.minConfidence {
filtered = append(filtered, det)
}
}
return filtered
}
func (edl *EdgeDecisionLayer) determineEventType(detections []Detection) string {
// Basit logic: ilk detection'ın class'ını kullan
// Gerçek uygulamada daha karmaşık logic olabilir
if len(detections) > 0 {
return detections[0].Class + "_detected"
}
return "unknown"
}
func (edl *EdgeDecisionLayer) shouldSendEvent(eventType string) bool {
edl.mutex.RLock()
lastTime, exists := edl.lastEvents[eventType]
edl.mutex.RUnlock()
if !exists {
return true
}
return time.Since(lastTime) > edl.cooldown
}
func (edl *EdgeDecisionLayer) calculateMaxConfidence(detections []Detection) float64 {
max := 0.0
for _, det := range detections {
if det.Confidence > max {
max = det.Confidence
}
}
return max
}
|
MQTT Publisher
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
|
package main
import (
"encoding/json"
mqtt "github.com/eclipse/paho.mqtt.golang"
)
type MQTTPublisher struct {
client mqtt.Client
topic string
}
func NewMQTTPublisher(brokerURL, topic, clientID string) (*MQTTPublisher, error) {
opts := mqtt.NewClientOptions()
opts.AddBroker(brokerURL)
opts.SetClientID(clientID)
client := mqtt.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
return nil, token.Error()
}
return &MQTTPublisher{
client: client,
topic: topic,
}, nil
}
func (mp *MQTTPublisher) PublishWithContext(ctx context.Context, event *Event) error {
data, err := json.Marshal(event)
if err != nil {
return err
}
token := mp.client.Publish(mp.topic, 1, false, data)
done := make(chan struct{})
go func() {
token.Wait()
close(done)
}()
select {
case <-ctx.Done():
return ctx.Err()
case <-done:
return token.Error()
}
}
func (mp *MQTTPublisher) Close() error {
if mp.client != nil && mp.client.IsConnected() {
// Bekleyen operasyonlar için 250ms timeout ile disconnect
mp.client.Disconnect(250)
}
return nil
}
|
HTTP Webhook Publisher
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
package main
import (
"bytes"
"encoding/json"
"fmt"
"net/http"
)
type WebhookPublisher struct {
client *http.Client
webhookURL string
}
func (wp *WebhookPublisher) PublishWithContext(ctx context.Context, event *Event) error {
data, err := json.Marshal(event)
if err != nil {
return err
}
req, err := http.NewRequestWithContext(ctx, "POST", wp.webhookURL, bytes.NewReader(data))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
resp, err := wp.client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode >= 400 {
return fmt.Errorf("webhook returned status %d", resp.StatusCode)
}
return nil
}
|
11. Multi-Camera Desteği
Production ortamında genellikle birden fazla kamera yönetmek gerekir. Her kamera için ayrı pipeline oluşturarak paralel işleme yapabiliriz:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
|
import (
"context"
"fmt"
"log"
"sync"
)
type MultiCameraManager struct {
cameras map[string]*CameraPipeline
mutex sync.RWMutex
ctx context.Context
cancel context.CancelFunc
}
type CameraPipeline struct {
ID string
Source *VideoSource
Pipeline *Pipeline
Config CameraConfig
}
type CameraConfig struct {
URL string
Width int
Height int
MotionEnabled bool
ObjectEnabled bool
}
func NewMultiCameraManager() *MultiCameraManager {
ctx, cancel := context.WithCancel(context.Background())
return &MultiCameraManager{
cameras: make(map[string]*CameraPipeline),
ctx: ctx,
cancel: cancel,
}
}
func (mcm *MultiCameraManager) AddCamera(id string, config CameraConfig) error {
mcm.mutex.Lock()
defer mcm.mutex.Unlock()
source := NewVideoSource(config.URL, config.Width, config.Height)
pipeline := NewPipeline(source)
// Motion detector ekle
if config.MotionEnabled {
motionDetector := NewMotionDetector(0.05, config.Width, config.Height)
pipeline.AddProcessor(motionDetector)
}
// Object detector ekle
if config.ObjectEnabled {
objectDetector, err := NewObjectDetector("model.onnx", "config.yaml", "classes.txt")
if err == nil {
pipeline.AddProcessor(objectDetector)
}
}
cameraPipeline := &CameraPipeline{
ID: id,
Source: source,
Pipeline: pipeline,
Config: config,
}
mcm.cameras[id] = cameraPipeline
// Pipeline'ı başlat
go func() {
if err := pipeline.Run(mcm.ctx); err != nil {
log.Printf("Camera %s pipeline hatası: %v", id, err)
}
}()
return nil
}
func (mcm *MultiCameraManager) RemoveCamera(id string) error {
mcm.mutex.Lock()
defer mcm.mutex.Unlock()
if camera, exists := mcm.cameras[id]; exists {
camera.Pipeline.Stop()
delete(mcm.cameras, id)
}
return nil
}
func (mcm *MultiCameraManager) GetCameraStatus(id string) (map[string]interface{}, error) {
mcm.mutex.RLock()
defer mcm.mutex.RUnlock()
camera, exists := mcm.cameras[id]
if !exists {
return nil, fmt.Errorf("camera %s not found", id)
}
return map[string]interface{}{
"id": camera.ID,
"url": camera.Config.URL,
"status": "running",
"width": camera.Config.Width,
"height": camera.Config.Height,
}, nil
}
|
12.1 Memory Pooling
Video işleme, çok fazla bellek allocation’ı gerektirir. sync.Pool kullanarak bellek allocation overhead’ini azaltabiliriz:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
|
var framePool = sync.Pool{
New: func() interface{} {
// Frame buffer'ı için pool
return make([]byte, 640*480*3) // RGB24, 640x480
},
}
// Frame okurken pool'dan al (doğru kullanım örneği)
func (vs *VideoSource) readFrames(ctx context.Context) {
frameSize := vs.width * vs.height * 3
for {
frameBuffer := framePool.Get().([]byte)
n, err := io.ReadFull(vs.stdout, frameBuffer)
if err != nil {
// Hata durumunda buffer'ı hemen pool'a geri ver
framePool.Put(frameBuffer)
if err == io.EOF || err == io.ErrUnexpectedEOF {
return
}
select {
case vs.errors <- fmt.Errorf("read frame: %w", err):
case <-ctx.Done():
}
return
}
if n != frameSize {
framePool.Put(frameBuffer)
continue
}
// Frame data'yı pool'dan bağımsız olacak şekilde kopyala
frameData := make([]byte, frameSize)
copy(frameData, frameBuffer)
// Buffer'ı pool'a geri ver
framePool.Put(frameBuffer)
frame := &Frame{
Data: frameData,
Width: vs.width,
Height: vs.height,
Timestamp: time.Now(),
Metadata: make(map[string]interface{}),
}
select {
case vs.frames <- frame:
case <-ctx.Done():
return
}
}
}
|
12.2 Frame Rate Kontrolü
Bazı durumlarda tüm frame’leri işlemek gerekmez. Frame skipping ile CPU kullanımını azaltabiliriz:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
type FrameRateController struct {
targetFPS int
frameCounter int
skipRatio int
}
func NewFrameRateController(targetFPS int) *FrameRateController {
// Örneğin: 30 FPS'den 10 FPS'e düşürmek için her 3 frame'den 1'ini al
skipRatio := 30 / targetFPS
return &FrameRateController{
targetFPS: targetFPS,
skipRatio: skipRatio,
}
}
func (frc *FrameRateController) ShouldProcess() bool {
frc.frameCounter++
return frc.frameCounter%frc.skipRatio == 0
}
|
12.3 Context ve Cancellation
Uzun süren işlemlerde context cancellation kullanarak kaynakları doğru şekilde temizleyelim:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
import (
"os"
"os/signal"
"syscall"
)
func (p *Pipeline) Run(ctx context.Context) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// Graceful shutdown için signal handling
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigChan
cancel()
}()
// Pipeline'ı çalıştır
// ...
return nil
}
|
12.4 Benchmarking
Performans metriklerini toplamak için:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
type PerformanceMetrics struct {
FramesProcessed int64
ProcessingTime time.Duration
FrameRate float64
startTime time.Time
mutex sync.RWMutex
}
func NewPerformanceMetrics() *PerformanceMetrics {
return &PerformanceMetrics{
startTime: time.Now(),
}
}
func (pm *PerformanceMetrics) RecordFrame(processingTime time.Duration) {
pm.mutex.Lock()
defer pm.mutex.Unlock()
pm.FramesProcessed++
pm.ProcessingTime += processingTime
elapsed := time.Since(pm.startTime).Seconds()
if elapsed > 0 {
pm.FrameRate = float64(pm.FramesProcessed) / elapsed
}
}
|
Test ortamında elde ettiğimiz bazı gerçek performans değerleri:
- Hardware: Raspberry Pi 4 (4GB RAM), 640x480 çözünürlük
- Motion Detection: ~25-30 FPS (CPU: %15-20)
- Object Detection (CPU): ~2-3 FPS (CPU: %80-90)
- Object Detection (GPU/OpenVINO): ~8-10 FPS (CPU: %40-50)
- Memory Kullanımı: ~150-200 MB (memory pool ile)
- Network Latency (MQTT): ~10-50ms (local broker)
Önemli Not: Object detection için CPU kullanımı çok yüksek. Production’da GPU veya dedicated AI accelerator (Jetson Nano, Neural Compute Stick) kullanılması önerilir.
12.6 Error Handling ve Resilience
Retry Mekanizması
Video kaynağı bağlantıları kesilebilir. Retry mekanizması ekleyelim:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
func (vs *VideoSource) StartWithRetry(ctx context.Context, maxRetries int) error {
var lastErr error
for i := 0; i < maxRetries; i++ {
if err := vs.Start(ctx); err == nil {
return nil
}
lastErr = err
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(time.Second * time.Duration(i+1)):
// Exponential backoff
}
}
return fmt.Errorf("failed after %d retries: %w", maxRetries, lastErr)
}
|
Graceful Degradation
Object detection servisi çalışmıyorsa, sadece motion detection ile devam edebiliriz:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
func (od *ObjectDetector) Process(ctx context.Context, in <-chan *Frame, out chan<- *Frame) {
for frame := range in {
select {
case <-ctx.Done():
return
default:
detections, err := od.detect(frame)
if err != nil {
// Error log'la ama pipeline'ı durdurma
frame.Metadata["detection_error"] = err.Error()
} else {
frame.Metadata["detections"] = detections
}
select {
case out <- frame:
case <-ctx.Done():
return
}
}
}
}
|
12.7 Monitoring ve Logging
Structured Logging
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
import "github.com/sirupsen/logrus"
var logger = logrus.New()
func init() {
logger.SetFormatter(&logrus.JSONFormatter{})
logger.SetLevel(logrus.InfoLevel)
}
// Kullanım
logger.WithFields(logrus.Fields{
"frame_id": frameID,
"fps": fps,
"detections": len(detections),
"processing_time_ms": processingTime.Milliseconds(),
}).Info("Frame processed")
|
Metrics Export (Prometheus)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
import "github.com/prometheus/client_golang/prometheus"
var (
framesProcessed = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "video_frames_processed_total",
Help: "Total number of frames processed",
},
[]string{"source"},
)
processingDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "video_frame_processing_seconds",
Help: "Frame processing duration",
Buckets: prometheus.DefBuckets,
},
[]string{"stage"},
)
)
func init() {
prometheus.MustRegister(framesProcessed)
prometheus.MustRegister(processingDuration)
}
|
Metrics HTTP Endpoint
Prometheus metrics’lerini expose etmek için HTTP endpoint ekleyelim:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
import (
"net/http"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
func startMetricsServer(port string) error {
http.Handle("/metrics", promhttp.Handler())
return http.ListenAndServe(":"+port, nil)
}
// main.go içinde
go func() {
if err := startMetricsServer("2112"); err != nil {
log.Printf("Metrics server error: %v", err)
}
}()
|
Go’nun built-in profiling araçları ile performans analizi yapabiliriz.
pprof ile CPU Profiling
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
import (
_ "net/http/pprof"
"net/http"
)
func main() {
// Profiling endpoint'lerini ekle
go func() {
log.Println(http.ListenAndServe("localhost:6060", nil))
}()
// ... uygulama kodunuz
}
// Profil toplamak için:
// go tool pprof http://localhost:6060/debug/pprof/profile?seconds=30
// go tool pprof http://localhost:6060/debug/pprof/heap
|
Memory Profiling
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
import (
"runtime"
"runtime/pprof"
"os"
)
func captureMemoryProfile(filename string) error {
f, err := os.Create(filename)
if err != nil {
return err
}
defer f.Close()
runtime.GC() // GC'yi zorla çalıştır
return pprof.WriteHeapProfile(f)
}
// Kullanım: Periyodik olarak memory profile al
go func() {
ticker := time.NewTicker(5 * time.Minute)
defer ticker.Stop()
for range ticker.C {
captureMemoryProfile("memprofile.prof")
}
}()
|
Goroutine Profiling
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
import (
"runtime/pprof"
"os"
)
func captureGoroutineProfile(filename string) error {
f, err := os.Create(filename)
if err != nil {
return err
}
defer f.Close()
return pprof.Lookup("goroutine").WriteTo(f, 0)
}
|
13. Testing
Video işleme sistemlerinde test yazmak için mock’lar ve test utilities kullanmalıyız. Production sistemler için kapsamlı test stratejisi kritik öneme sahiptir.
13.1 Unit Test Örneği
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
|
package main
import (
"testing"
"time"
)
func TestMotionDetector(t *testing.T) {
detector := NewMotionDetector(0.05, 640, 480)
// İlk frame - motion olmamalı
frame1 := &Frame{
Data: make([]byte, 640*480*3),
Width: 640,
Height: 480,
Timestamp: time.Now(),
Metadata: make(map[string]interface{}),
}
hasMotion := detector.detectMotion(frame1)
if hasMotion {
t.Error("İlk frame'de motion olmamalı")
}
// İkinci frame - değişiklik yap
frame2 := &Frame{
Data: make([]byte, 640*480*3),
Width: 640,
Height: 480,
Timestamp: time.Now(),
Metadata: make(map[string]interface{}),
}
// Frame2'de değişiklik yap (pixel'leri değiştir)
for i := 0; i < len(frame2.Data); i += 100 {
frame2.Data[i] = 255
}
hasMotion = detector.detectMotion(frame2)
if !hasMotion {
t.Error("Değişiklik olan frame'de motion olmalı")
}
}
|
13.2 Integration Test
Gerçek bileşenlerin birlikte çalışmasını test eden integration testler:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
|
func TestVideoPipelineIntegration(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
// Gerçek FFmpeg process ile test (test video dosyası kullan)
source := NewVideoSource("test_video.mp4", 640, 480)
pipeline := NewPipeline(source)
motionDetector := NewMotionDetector(0.05, 640, 480)
objectDetector := NewMockObjectDetector()
edgeDecision := NewEdgeDecisionLayer(0.7, 5*time.Second)
pipeline.AddProcessor(motionDetector)
pipeline.AddProcessor(objectDetector)
pipeline.AddProcessor(edgeDecision)
// Event'leri topla
events := make([]Event, 0)
go func() {
for event := range edgeDecision.Events() {
events = append(events, event)
}
}()
// Pipeline'ı çalıştır
if err := pipeline.Run(ctx); err != nil {
t.Fatalf("Pipeline başlatılamadı: %v", err)
}
// 10 saniye bekle
time.Sleep(10 * time.Second)
pipeline.Stop()
// En az bir event bekleniyor
if len(events) == 0 {
t.Error("Hiç event üretilmedi")
}
// Event'lerin doğru formatta olduğunu kontrol et
for _, event := range events {
if event.Timestamp.IsZero() {
t.Error("Event timestamp boş")
}
if event.Type == "" {
t.Error("Event type boş")
}
}
}
|
13.3 End-to-End (E2E) Test
Tüm sistemin birlikte çalışmasını test eden E2E testler:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
func TestE2EVideoProcessing(t *testing.T) {
// Test ortamını başlat
testEnv := setupTestEnvironment(t)
defer testEnv.Cleanup()
// MQTT broker başlat (test için)
mqttBroker := startTestMQTTBroker(t)
defer mqttBroker.Stop()
// Video processing servisini başlat
config := &Config{
VideoSource: "rtsp://test-camera:554/stream",
MQTTBroker: "localhost:1883",
}
service := NewVideoProcessingService(config)
if err := service.Start(); err != nil {
t.Fatalf("Service başlatılamadı: %v", err)
}
defer service.Stop()
// Test video stream'i gönder
go sendTestVideoStream(t, "rtsp://test-camera:554/stream")
// MQTT'dan event'leri dinle
events := subscribeToMQTT(t, "video/events/+")
// 30 saniye içinde event bekleniyor
timeout := time.After(30 * time.Second)
select {
case event := <-events:
// Event'i validate et
if err := validateEvent(event); err != nil {
t.Errorf("Geçersiz event: %v", err)
}
t.Logf("Event alındı: %+v", event)
case <-timeout:
t.Error("Event timeout - hiç event alınamadı")
}
}
|
13.4 Load Testing
Sistemin yük altındaki performansını test eden load testler:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
|
func TestLoadMultipleCameras(t *testing.T) {
if testing.Short() {
t.Skip("Skipping load test in short mode")
}
numCameras := 10
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
// Multi-camera manager oluştur
manager := NewMultiCameraManager()
// 10 kamera ekle
for i := 0; i < numCameras; i++ {
cameraID := fmt.Sprintf("camera-%d", i)
sourceURL := fmt.Sprintf("rtsp://test-camera-%d:554/stream", i)
if err := manager.AddCamera(cameraID, sourceURL, 640, 480); err != nil {
t.Fatalf("Kamera eklenemedi: %v", err)
}
}
// Tüm kameraları başlat
if err := manager.StartAll(ctx); err != nil {
t.Fatalf("Kameralar başlatılamadı: %v", err)
}
// 2 dakika çalıştır
time.Sleep(2 * time.Minute)
// Metrikleri topla
metrics := manager.GetMetrics()
// Performans kontrolü
if metrics.AverageFPS < 20 {
t.Errorf("Düşük FPS: %f (beklenen: >= 20)", metrics.AverageFPS)
}
if metrics.CPUUsage > 80 {
t.Errorf("Yüksek CPU kullanımı: %f%% (beklenen: < 80%%)", metrics.CPUUsage)
}
if metrics.MemoryUsage > 500*1024*1024 { // 500MB
t.Errorf("Yüksek bellek kullanımı: %d bytes", metrics.MemoryUsage)
}
manager.StopAll()
}
|
13.5 Stress Testing
Sistemin limitlerini test eden stress testler:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
func TestStressHighFrameRate(t *testing.T) {
if testing.Short() {
t.Skip("Skipping stress test in short mode")
}
// Çok yüksek frame rate ile test
highFPSSource := NewMockVideoSourceWithFPS(60) // 60 FPS
pipeline := NewPipeline(highFPSSource)
// Tüm processor'ları ekle
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancel()
// Frame drop sayısını izle
frameDrops := atomic.Int64{}
go func() {
for frame := range pipeline.Frames() {
if frame.Metadata["dropped"] == true {
frameDrops.Add(1)
}
}
}()
if err := pipeline.Run(ctx); err != nil {
t.Fatalf("Pipeline başlatılamadı: %v", err)
}
time.Sleep(30 * time.Second)
pipeline.Stop()
dropRate := float64(frameDrops.Load()) / 1800.0 // 30 saniye * 60 FPS
if dropRate > 0.1 { // %10'dan fazla drop
t.Logf("Yüksek frame drop oranı: %.2f%%", dropRate*100)
// Bu durumda backpressure mekanizması çalışıyor demektir
}
}
|
13.6 Pipeline Integration Test
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
func TestPipeline(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// Mock video source oluştur
source := &MockVideoSource{
frames: make(chan *Frame, 10),
}
// Test frame'leri ekle
for i := 0; i < 5; i++ {
source.frames <- &Frame{
Data: make([]byte, 640*480*3),
Width: 640,
Height: 480,
Timestamp: time.Now(),
Metadata: make(map[string]interface{}),
}
}
close(source.frames)
// Pipeline oluştur
pipeline := NewPipeline(source)
motionDetector := NewMotionDetector(0.05, 640, 480)
pipeline.AddProcessor(motionDetector)
// Pipeline'ı çalıştır
if err := pipeline.Run(ctx); err != nil {
t.Fatalf("Pipeline başlatılamadı: %v", err)
}
// Sonuçları kontrol et
time.Sleep(100 * time.Millisecond)
pipeline.Stop()
}
|
13.7 Mock Video Source
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
type MockVideoSource struct {
frames chan *Frame
errors chan error
}
func (m *MockVideoSource) Start(ctx context.Context) error {
return nil
}
func (m *MockVideoSource) Frames() <-chan *Frame {
return m.frames
}
func (m *MockVideoSource) Errors() <-chan error {
return m.errors
}
func (m *MockVideoSource) Stop() error {
close(m.frames)
close(m.errors)
return nil
}
|
13.8 Benchmark Test
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
func BenchmarkMotionDetection(b *testing.B) {
detector := NewMotionDetector(0.05, 640, 480)
frame := &Frame{
Data: make([]byte, 640*480*3),
Width: 640,
Height: 480,
Timestamp: time.Now(),
Metadata: make(map[string]interface{}),
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
detector.detectMotion(frame)
// Frame'i değiştir
frame.Data[i%len(frame.Data)]++
}
}
// Çalıştırmak için: go test -bench=BenchmarkMotionDetection -benchmem
|
13.9 Rate Limiting Test
Rate limiting mekanizmasının doğru çalıştığını test etmek için:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
|
import (
"sync"
"testing"
"time"
"golang.org/x/time/rate"
)
func TestRateLimiting(t *testing.T) {
publisher := &MockPublisher{
published: make([]*Event, 0),
}
// 10 event/saniye limit ile rate limiter oluştur
limiter := rate.NewLimiter(10, 1) // 10 per second, burst 1
rateLimited := &RateLimitedPublisher{
publisher: publisher,
limiter: limiter,
}
// 20 event gönder
start := time.Now()
for i := 0; i < 20; i++ {
event := &Event{
Type: "test",
Timestamp: time.Now(),
Data: map[string]interface{}{"id": i},
}
if err := rateLimited.Publish(event); err != nil {
t.Errorf("Publish error: %v", err)
}
}
elapsed := time.Since(start)
// 20 event için en az 1 saniye gerekli (10 event/sec limit)
// Biraz tolerans ver (0.9 saniye)
if elapsed < 900*time.Millisecond {
t.Errorf("Rate limiting çalışmıyor: %v (beklenen: >= 1s)", elapsed)
}
// Tüm event'lerin yayınlandığını kontrol et
if len(publisher.published) != 20 {
t.Errorf("Beklenen 20 event, alınan: %d", len(publisher.published))
}
}
type MockPublisher struct {
published []*Event
mutex sync.Mutex
}
func (mp *MockPublisher) Publish(event *Event) error {
mp.mutex.Lock()
defer mp.mutex.Unlock()
mp.published = append(mp.published, event)
return nil
}
|
14. Configuration Management
Yapılandırma yönetimi için YAML veya TOML kullanabiliriz.
YAML Configuration
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
|
import (
"os"
"gopkg.in/yaml.v3"
)
type Config struct {
VideoSource struct {
URL string `yaml:"url"`
Width int `yaml:"width"`
Height int `yaml:"height"`
} `yaml:"video_source"`
Processing struct {
MotionThreshold float64 `yaml:"motion_threshold"`
MinConfidence float64 `yaml:"min_confidence"`
TargetFPS int `yaml:"target_fps"`
} `yaml:"processing"`
MQTT struct {
BrokerURL string `yaml:"broker_url"`
Topic string `yaml:"topic"`
ClientID string `yaml:"client_id"`
} `yaml:"mqtt"`
Server struct {
MetricsPort string `yaml:"metrics_port"`
HealthPort string `yaml:"health_port"`
} `yaml:"server"`
}
func LoadConfig(path string) (*Config, error) {
data, err := os.ReadFile(path)
if err != nil {
return nil, err
}
var config Config
if err := yaml.Unmarshal(data, &config); err != nil {
return nil, err
}
return &config, nil
}
|
config.yaml Örneği
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
video_source:
url: "rtsp://camera:554/stream"
width: 640
height: 480
processing:
motion_threshold: 0.05
min_confidence: 0.7
target_fps: 10
mqtt:
broker_url: "tcp://mqtt:1883"
topic: "video/events"
client_id: "video-processor"
server:
metrics_port: "2112"
health_port: "8080"
|
15. Health Checks ve Observability
Production ortamlarında health check endpoint’leri kritik öneme sahiptir.
Health Check Endpoint
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
|
import (
"encoding/json"
"net/http"
"sync/atomic"
)
type HealthStatus struct {
Status string `json:"status"`
Version string `json:"version"`
Uptime string `json:"uptime"`
Frames int64 `json:"frames_processed"`
Errors int64 `json:"errors"`
Details map[string]string `json:"details,omitempty"`
}
type HealthChecker struct {
startTime time.Time
frames int64
errors int64
version string
}
func NewHealthChecker(version string) *HealthChecker {
return &HealthChecker{
startTime: time.Now(),
version: version,
}
}
func (hc *HealthChecker) RecordFrame() {
atomic.AddInt64(&hc.frames, 1)
}
func (hc *HealthChecker) RecordError() {
atomic.AddInt64(&hc.errors, 1)
}
func (hc *HealthChecker) ServeHTTP(w http.ResponseWriter, r *http.Request) {
status := "healthy"
// Son 1 dakikada frame işlenmediyse unhealthy
frames := atomic.LoadInt64(&hc.frames)
if frames == 0 {
status = "unhealthy"
}
uptime := time.Since(hc.startTime).String()
health := HealthStatus{
Status: status,
Version: hc.version,
Uptime: uptime,
Frames: frames,
Errors: atomic.LoadInt64(&hc.errors),
}
w.Header().Set("Content-Type", "application/json")
if status == "healthy" {
w.WriteHeader(http.StatusOK)
} else {
w.WriteHeader(http.StatusServiceUnavailable)
}
json.NewEncoder(w).Encode(health)
}
// Kullanım
healthChecker := NewHealthChecker("1.0.0")
http.Handle("/health", healthChecker)
go http.ListenAndServe(":8080", nil)
|
Readiness ve Liveness Probes (Kubernetes)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
livenessProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 30
periodSeconds: 10
timeoutSeconds: 5
failureThreshold: 3
readinessProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 5
periodSeconds: 5
timeoutSeconds: 3
failureThreshold: 2
|
16. Backpressure Handling
Video pipeline’larında frame rate yüksek olduğunda channel buffer’lar dolabilir. Backpressure mekanizması ekleyelim.
Adaptive Frame Dropping
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
|
import (
"math"
"math/rand"
"sync"
"sync/atomic"
)
type FrameDropper struct {
dropRatio float64
bufferFull bool
droppedCount int64
mutex sync.RWMutex
}
func NewFrameDropper() *FrameDropper {
return &FrameDropper{
dropRatio: 0.0,
}
}
func (fd *FrameDropper) ShouldDrop(bufferUsage float64) bool {
fd.mutex.Lock()
defer fd.mutex.Unlock()
// Buffer %80'den fazla dolmuşsa frame drop başlat
if bufferUsage > 0.8 {
fd.bufferFull = true
fd.dropRatio = math.Min(0.5, (bufferUsage-0.8)*2.5) // Max %50 drop
return true
}
if fd.bufferFull && bufferUsage > 0.5 {
// Buffer hala yüksek, drop devam et
return rand.Float64() < fd.dropRatio
}
// Buffer normale döndü
fd.bufferFull = false
fd.dropRatio = 0.0
return false
}
func (fd *FrameDropper) RecordDrop() {
atomic.AddInt64(&fd.droppedCount, 1)
}
|
Channel Buffer Monitoring
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
func monitorChannelBuffer(ch <-chan *Frame, bufferSize int) float64 {
currentLen := len(ch)
return float64(currentLen) / float64(bufferSize)
}
// Pipeline içinde kullanım
select {
case out <- frame:
// Frame gönderildi
case <-time.After(10 * time.Millisecond):
// Timeout - buffer dolu olabilir
bufferUsage := monitorChannelBuffer(out, cap(out))
if frameDropper.ShouldDrop(bufferUsage) {
frameDropper.RecordDrop()
// Frame'i atla
continue
}
// Retry
select {
case out <- frame:
case <-ctx.Done():
return
}
}
|
17. Network Optimizasyonu ve Video Streaming Protokolleri
Edge processing sistemlerinde network optimizasyonu kritik öneme sahiptir. Video akışları yüksek bant genişliği gerektirir ve network gecikmeleri sistem performansını doğrudan etkiler.
17.1 Video Streaming Protokolleri
RTSP (Real-Time Streaming Protocol)
RTSP, IP kameralar için en yaygın protokoldür:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
// RTSP URL formatı
rtspURL := "rtsp://username:password@192.168.1.100:554/stream1"
// FFmpeg ile RTSP bağlantısı
args := []string{
"-rtsp_transport", "tcp", // TCP kullan (UDP yerine)
"-i", rtspURL,
"-vf", "scale=640:480",
"-r", "15", // Frame rate limit
"-pix_fmt", "rgb24",
"-f", "rawvideo",
"pipe:1",
}
|
RTSP Optimizasyonları:
- TCP transport kullan (UDP packet loss’u önler)
- Timeout ayarları yap (bağlantı kopmalarında hızlı recovery)
- Reconnection logic ekle (otomatik yeniden bağlanma)
WebRTC (Web Real-Time Communication)
WebRTC, browser tabanlı real-time streaming için idealdir:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
|
import (
"bytes"
"image"
"image/color"
"image/jpeg"
"sync"
"github.com/gorilla/websocket"
)
// Frame'i JPEG'e çevir (helper function)
func frameToJPEG(frame *Frame, quality int) ([]byte, error) {
img := image.NewRGBA(image.Rect(0, 0, frame.Width, frame.Height))
for y := 0; y < frame.Height; y++ {
for x := 0; x < frame.Width; x++ {
idx := (y*frame.Width + x) * 3
if idx+2 >= len(frame.Data) {
continue
}
img.Set(x, y, color.RGBA{
R: frame.Data[idx],
G: frame.Data[idx+1],
B: frame.Data[idx+2],
A: 255,
})
}
}
var buf bytes.Buffer
if err := jpeg.Encode(&buf, img, &jpeg.Options{Quality: quality}); err != nil {
return nil, err
}
return buf.Bytes(), nil
}
// WebRTC için frame'leri WebSocket üzerinden gönderme
type WebRTCStreamer struct {
clients map[string]*websocket.Conn
mutex sync.RWMutex
}
func (ws *WebRTCStreamer) BroadcastFrame(frame *Frame) {
ws.mutex.RLock()
defer ws.mutex.RUnlock()
// Frame'i JPEG'e çevir
jpegData, err := frameToJPEG(frame, 80)
if err != nil {
return
}
// Tüm client'lara gönder
for _, conn := range ws.clients {
if err := conn.WriteMessage(websocket.BinaryMessage, jpegData); err != nil {
// Hatalı bağlantıyı kaldır
delete(ws.clients, conn.RemoteAddr().String())
}
}
}
|
17.2 Bandwidth Optimizasyonu
Adaptive Bitrate Streaming
Network koşullarına göre kaliteyi dinamik olarak ayarlama:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
|
import (
"fmt"
"sync"
)
type AdaptiveBitrate struct {
currentQuality string
bandwidth float64 // Mbps
frameRate int
resolution string
mutex sync.RWMutex
}
func (ab *AdaptiveBitrate) AdjustQuality(measuredBandwidth float64) {
ab.mutex.Lock()
defer ab.mutex.Unlock()
// Bandwidth'e göre kalite seviyesi belirle
if measuredBandwidth < 1.0 {
ab.resolution = "320x240"
ab.frameRate = 10
} else if measuredBandwidth < 3.0 {
ab.resolution = "640x480"
ab.frameRate = 15
} else {
ab.resolution = "1280x720"
ab.frameRate = 30
}
ab.bandwidth = measuredBandwidth
}
func (ab *AdaptiveBitrate) GetFFmpegArgs() []string {
ab.mutex.RLock()
defer ab.mutex.RUnlock()
width, height := parseResolution(ab.resolution)
return []string{
"-vf", fmt.Sprintf("scale=%d:%d", width, height),
"-r", fmt.Sprintf("%d", ab.frameRate),
}
}
func parseResolution(resolution string) (int, int) {
var width, height int
fmt.Sscanf(resolution, "%dx%d", &width, &height)
return width, height
}
|
<|tool▁calls▁begin|><|tool▁call▁begin|>
read_file
Frame Throttling
Network yükünü azaltmak için frame’leri throttle etme:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
type FrameThrottler struct {
maxFPS int
lastFrame time.Time
minInterval time.Duration
mutex sync.Mutex
}
func NewFrameThrottler(maxFPS int) *FrameThrottler {
return &FrameThrottler{
maxFPS: maxFPS,
minInterval: time.Second / time.Duration(maxFPS),
}
}
func (ft *FrameThrottler) ShouldProcess() bool {
ft.mutex.Lock()
defer ft.mutex.Unlock()
now := time.Now()
if now.Sub(ft.lastFrame) >= ft.minInterval {
ft.lastFrame = now
return true
}
return false
}
|
17.3 Network Resilience
Connection Retry ve Backoff
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
type NetworkRetry struct {
maxRetries int
backoff time.Duration
mutex sync.Mutex
}
func (nr *NetworkRetry) ConnectWithRetry(connectFn func() error) error {
nr.mutex.Lock()
defer nr.mutex.Unlock()
backoff := nr.backoff
for i := 0; i < nr.maxRetries; i++ {
if err := connectFn(); err == nil {
return nil
}
if i < nr.maxRetries-1 {
time.Sleep(backoff)
backoff *= 2 // Exponential backoff
}
}
return fmt.Errorf("connection failed after %d retries", nr.maxRetries)
}
|
Network Quality Monitoring
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
type NetworkMonitor struct {
latency []time.Duration
packetLoss float64
bandwidth float64
mutex sync.RWMutex
}
func (nm *NetworkMonitor) MeasureLatency() time.Duration {
start := time.Now()
// Ping veya test paketi gönder
// ...
latency := time.Since(start)
nm.mutex.Lock()
nm.latency = append(nm.latency, latency)
if len(nm.latency) > 100 {
nm.latency = nm.latency[1:]
}
nm.mutex.Unlock()
return latency
}
func (nm *NetworkMonitor) GetAverageLatency() time.Duration {
nm.mutex.RLock()
defer nm.mutex.RUnlock()
if len(nm.latency) == 0 {
return 0
}
var sum time.Duration
for _, l := range nm.latency {
sum += l
}
return sum / time.Duration(len(nm.latency))
}
|
17.4 Video Compression Stratejileri
Edge’den cloud’a gönderilen event’lerde frame’leri sıkıştırma:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
// JPEG kalite ayarına göre compression
func compressFrame(frame *Frame, quality int) ([]byte, error) {
img := &image.RGBA{
Pix: frame.Data,
Stride: frame.Width * 3,
Rect: image.Rect(0, 0, frame.Width, frame.Height),
}
var buf bytes.Buffer
opts := &jpeg.Options{Quality: quality}
if err := jpeg.Encode(&buf, img, opts); err != nil {
return nil, err
}
return buf.Bytes(), nil
}
// Quality seviyeleri:
// - 90-100: Yüksek kalite, büyük dosya
// - 70-89: Orta kalite, orta dosya
// - 50-69: Düşük kalite, küçük dosya
|
18. Deployment ve Containerization
Dockerfile
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
|
# Build aşaması - Multi-stage build ile cache optimization
FROM golang:1.23-alpine AS builder
WORKDIR /app
# FFmpeg ve OpenCV bağımlılıkları
RUN apk add --no-cache \
ffmpeg \
pkgconfig \
gstreamer \
gstreamer-dev \
opencv-dev
# Dependencies önce (cache için - değişmeyen dosyalar)
COPY go.mod go.sum ./
RUN go mod download
# Sonra kod (değişen dosyalar)
COPY . .
# Binary'yi optimize ederek derle
RUN CGO_ENABLED=1 GOOS=linux go build \
-ldflags="-w -s" \
-o video-processor ./cmd/main.go
# Runtime image - Minimal ve güvenli
FROM alpine:latest
RUN apk add --no-cache \
ffmpeg \
ca-certificates \
libc6-compat
# Non-root user oluştur (security best practice)
RUN addgroup -g 1000 appuser && \
adduser -D -u 1000 -G appuser appuser
WORKDIR /app
# Binary'yi builder'dan kopyala
COPY --from=builder /app/video-processor /app/video-processor
# Ownership'i değiştir
RUN chown -R appuser:appuser /app
# Non-root user olarak çalıştır
USER appuser
CMD ["/app/video-processor"]
|
Docker Compose Örneği
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
version: '3.8'
services:
video-processor:
build: .
environment:
- RTSP_URL=rtsp://camera:554/stream
- MQTT_BROKER=mqtt://broker:1883
- LOG_LEVEL=info
volumes:
- ./config:/app/config
restart: unless-stopped
mqtt-broker:
image: eclipse-mosquitto:latest
ports:
- "1883:1883"
|
K3s/Kubernetes Deployment
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
apiVersion: apps/v1
kind: Deployment
metadata:
name: video-processor
spec:
replicas: 1
selector:
matchLabels:
app: video-processor
template:
metadata:
labels:
app: video-processor
spec:
containers:
- name: video-processor
image: video-processor:latest
env:
- name: RTSP_URL
valueFrom:
configMapKeyRef:
name: video-config
key: rtsp-url
resources:
requests:
memory: "256Mi"
cpu: "500m"
limits:
memory: "512Mi"
cpu: "1000m"
|
18.3 CI/CD Pipeline
Production deployment için CI/CD pipeline örneği:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
|
# .github/workflows/deploy.yml
name: Build and Deploy
on:
push:
branches: [main]
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/setup-go@v4
with:
go-version: '1.23'
- name: Run tests
run: |
go test -v -race -coverprofile=coverage.out ./...
- name: Upload coverage
uses: codecov/codecov-action@v3
with:
file: ./coverage.out
build:
needs: test
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Build Docker image
run: |
docker build -t video-processor:${{ github.sha }} .
docker tag video-processor:${{ github.sha }} video-processor:latest
- name: Push to registry
run: |
echo "${{ secrets.DOCKER_PASSWORD }}" | docker login -u "${{ secrets.DOCKER_USERNAME }}" --password-stdin
docker push video-processor:${{ github.sha }}
docker push video-processor:latest
deploy:
needs: build
runs-on: ubuntu-latest
steps:
- name: Deploy to Kubernetes
run: |
kubectl set image deployment/video-processor \
video-processor=video-processor:${{ github.sha }} \
-n production
|
GitLab CI örneği:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
# .gitlab-ci.yml
stages:
- test
- build
- deploy
test:
stage: test
image: golang:1.23
script:
- go test -v -race ./...
- go vet ./...
build:
stage: build
image: docker:latest
services:
- docker:dind
script:
- docker build -t $CI_REGISTRY_IMAGE:$CI_COMMIT_SHA .
- docker push $CI_REGISTRY_IMAGE:$CI_COMMIT_SHA
- docker tag $CI_REGISTRY_IMAGE:$CI_COMMIT_SHA $CI_REGISTRY_IMAGE:latest
- docker push $CI_REGISTRY_IMAGE:latest
deploy:
stage: deploy
image: bitnami/kubectl:latest
script:
- kubectl set image deployment/video-processor video-processor=$CI_REGISTRY_IMAGE:$CI_COMMIT_SHA
only:
- main
|
18.4 Blue-Green Deployment
Sıfır downtime deployment için blue-green stratejisi:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
|
# blue-green-deployment.yaml
apiVersion: v1
kind: Service
metadata:
name: video-processor-service
spec:
selector:
app: video-processor
version: blue # veya green
ports:
- port: 8080
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: video-processor-blue
spec:
replicas: 3
template:
metadata:
labels:
app: video-processor
version: blue
spec:
containers:
- name: video-processor
image: video-processor:v1.0.0
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: video-processor-green
spec:
replicas: 0 # Başlangıçta kapalı
template:
metadata:
labels:
app: video-processor
version: green
spec:
containers:
- name: video-processor
image: video-processor:v1.1.0
|
Blue-green deployment script:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
#!/bin/bash
# blue-green-deploy.sh
CURRENT_VERSION=$(kubectl get service video-processor-service -o jsonpath='{.spec.selector.version}')
NEW_VERSION=$([ "$CURRENT_VERSION" == "blue" ] && echo "green" || echo "blue")
NEW_IMAGE="video-processor:v1.2.0"
echo "Current version: $CURRENT_VERSION"
echo "Deploying to: $NEW_VERSION"
# Yeni versiyonu scale up
kubectl scale deployment video-processor-$NEW_VERSION --replicas=3
kubectl set image deployment/video-processor-$NEW_VERSION video-processor=$NEW_IMAGE
# Health check
echo "Waiting for new version to be ready..."
kubectl wait --for=condition=available --timeout=300s deployment/video-processor-$NEW_VERSION
# Service'i yeni versiyona yönlendir
kubectl patch service video-processor-service -p "{\"spec\":{\"selector\":{\"version\":\"$NEW_VERSION\"}}}"
# Eski versiyonu scale down
kubectl scale deployment video-processor-$CURRENT_VERSION --replicas=0
echo "Deployment complete. New version: $NEW_VERSION"
|
18.5 Rollback Mekanizması
Hızlı rollback için mekanizma:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
|
package main
import (
"context"
"fmt"
"os/exec"
"time"
)
type DeploymentManager struct {
currentVersion string
previousVersion string
kubeconfig string
}
func (dm *DeploymentManager) Rollback() error {
if dm.previousVersion == "" {
return fmt.Errorf("no previous version to rollback to")
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
// Eski versiyona geri dön
cmd := exec.CommandContext(ctx, "kubectl",
"set", "image", "deployment/video-processor",
fmt.Sprintf("video-processor=video-processor:%s", dm.previousVersion),
)
if err := cmd.Run(); err != nil {
return fmt.Errorf("rollback failed: %w", err)
}
// Health check
if err := dm.waitForDeployment(ctx); err != nil {
return fmt.Errorf("rollback health check failed: %w", err)
}
dm.currentVersion, dm.previousVersion = dm.previousVersion, dm.currentVersion
return nil
}
func (dm *DeploymentManager) waitForDeployment(ctx context.Context) error {
cmd := exec.CommandContext(ctx, "kubectl",
"wait", "--for=condition=available",
"--timeout=300s", "deployment/video-processor",
)
return cmd.Run()
}
|
Kubernetes rollback komutu:
1
2
3
4
5
6
7
8
|
# Son başarılı revision'a dön
kubectl rollout undo deployment/video-processor
# Belirli bir revision'a dön
kubectl rollout undo deployment/video-processor --to-revision=3
# Rollout geçmişini görüntüle
kubectl rollout history deployment/video-processor
|
18.6 Canary Deployment
Kademeli deployment için canary stratejisi:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
apiVersion: v1
kind: Service
metadata:
name: video-processor
spec:
selector:
app: video-processor
ports:
- port: 8080
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: video-processor-stable
spec:
replicas: 9
template:
metadata:
labels:
app: video-processor
version: stable
spec:
containers:
- name: video-processor
image: video-processor:v1.0.0
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: video-processor-canary
spec:
replicas: 1 # %10 trafik
template:
metadata:
labels:
app: video-processor
version: canary
spec:
containers:
- name: video-processor
image: video-processor:v1.1.0
|
18.7 Pre-Deployment Checklist
Production’a deploy etmeden önce kontrol listesi:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
|
import (
"context"
"fmt"
"os/exec"
"time"
)
type PreDeploymentChecklist struct {
checks []Check
}
type Check struct {
Name string
Description string
Validate func() error
}
func NewPreDeploymentChecklist() *PreDeploymentChecklist {
return &PreDeploymentChecklist{
checks: []Check{
{
Name: "Unit Tests",
Description: "Tüm unit testler geçiyor mu?",
Validate: func() error {
cmd := exec.Command("go", "test", "./...")
return cmd.Run()
},
},
{
Name: "Integration Tests",
Description: "Integration testler geçiyor mu?",
Validate: func() error {
cmd := exec.Command("go", "test", "-tags=integration", "./...")
return cmd.Run()
},
},
{
Name: "Security Scan",
Description: "Güvenlik taraması yapıldı mı?",
Validate: func() error {
// Trivy, Snyk gibi araçlarla scan
return nil
},
},
{
Name: "Performance Test",
Description: "Performance testleri geçiyor mu?",
Validate: func() error {
// Load test sonuçlarını kontrol et
return nil
},
},
},
}
}
func (pdc *PreDeploymentChecklist) Run() error {
for _, check := range pdc.checks {
fmt.Printf("Checking: %s...\n", check.Name)
if err := check.Validate(); err != nil {
return fmt.Errorf("%s failed: %w", check.Name, err)
}
fmt.Printf("✓ %s passed\n", check.Name)
}
return nil
}
|
19. Güvenlik
Production ortamlarında güvenlik kritik öneme sahiptir. Edge processing sistemleri hassas video verilerini işlediği için güvenlik katmanları mutlaka uygulanmalıdır.
19.1 Credential Management ve Secret Rotation
RTSP URL’lerinde kullanıcı adı ve şifre kullanılıyorsa, environment variable veya secret management kullanın:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
|
import (
"crypto/tls"
"fmt"
"net/url"
"os"
"time"
)
type CredentialManager struct {
secrets map[string]string
lastRotated time.Time
mutex sync.RWMutex
}
func NewCredentialManager() *CredentialManager {
cm := &CredentialManager{
secrets: make(map[string]string),
}
cm.loadSecrets()
return cm
}
func (cm *CredentialManager) loadSecrets() {
// Kubernetes secrets veya HashiCorp Vault'tan yükle
cm.secrets["rtsp_username"] = os.Getenv("RTSP_USERNAME")
cm.secrets["rtsp_password"] = os.Getenv("RTSP_PASSWORD")
}
func (cm *CredentialManager) GetRTSPURL(baseURL string) string {
cm.mutex.RLock()
defer cm.mutex.RUnlock()
username := cm.secrets["rtsp_username"]
password := cm.secrets["rtsp_password"]
if username != "" && password != "" {
return fmt.Sprintf("rtsp://%s:%s@%s", username, password, baseURL)
}
return baseURL
}
// Log'larda credential'ları maskelemek için yardımcı fonksiyon
func maskCredentials(rawURL string) string {
u, err := url.Parse(rawURL)
if err != nil {
return rawURL
}
if u.User != nil {
u.User = url.UserPassword("***", "***")
}
return u.String()
}
// Secret rotation - periyodik olarak secret'ları yenile
func (cm *CredentialManager) RotateSecrets() {
cm.mutex.Lock()
defer cm.mutex.Unlock()
// Yeni secret'ları yükle
cm.loadSecrets()
cm.lastRotated = time.Now()
}
|
19.2 RTSP Authentication ve TLS
RTSP bağlantılarını güvenli hale getirmek için:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
import (
"crypto/tls"
"net/url"
)
type SecureRTSPClient struct {
tlsConfig *tls.Config
}
func NewSecureRTSPClient(certFile, keyFile string) (*SecureRTSPClient, error) {
cert, err := tls.LoadX509KeyPair(certFile, keyFile)
if err != nil {
return nil, err
}
return &SecureRTSPClient{
tlsConfig: &tls.Config{
Certificates: []tls.Certificate{cert},
MinVersion: tls.VersionTLS12,
},
}, nil
}
// RTSP over TLS (RTSPS) kullanımı
func (src *SecureRTSPClient) GetSecureRTSPURL(baseURL string) string {
u, err := url.Parse(baseURL)
if err != nil {
return baseURL
}
// RTSPS (RTSP over TLS) kullan
if u.Scheme == "rtsp" {
u.Scheme = "rtsps"
}
return u.String()
}
|
19.3 Video Stream Encryption (SRTP)
Gerçek zamanlı video akışlarını şifrelemek için SRTP (Secure Real-time Transport Protocol) kullanın:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
import (
"crypto/aes"
"crypto/cipher"
"crypto/rand"
"io"
)
type StreamEncryptor struct {
aesgcm cipher.AEAD
nonce []byte
}
func NewStreamEncryptor(key []byte) (*StreamEncryptor, error) {
block, err := aes.NewCipher(key)
if err != nil {
return nil, err
}
aesgcm, err := cipher.NewGCM(block)
if err != nil {
return nil, err
}
nonce := make([]byte, aesgcm.NonceSize())
if _, err := io.ReadFull(rand.Reader, nonce); err != nil {
return nil, err
}
return &StreamEncryptor{
aesgcm: aesgcm,
nonce: nonce,
}, nil
}
func (se *StreamEncryptor) EncryptFrame(frameData []byte) ([]byte, error) {
return se.aesgcm.Seal(nil, se.nonce, frameData, nil), nil
}
func (se *StreamEncryptor) DecryptFrame(encryptedData []byte) ([]byte, error) {
return se.aesgcm.Open(nil, se.nonce, encryptedData, nil)
}
|
19.4 API Authentication & Authorization
HTTP API endpoint’leri için authentication ve authorization:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
|
import (
"fmt"
"net/http"
"time"
"github.com/golang-jwt/jwt/v5"
)
type AuthMiddleware struct {
secretKey []byte
}
func NewAuthMiddleware(secretKey string) *AuthMiddleware {
return &AuthMiddleware{
secretKey: []byte(secretKey),
}
}
func (am *AuthMiddleware) Authenticate(next http.HandlerFunc) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
token := r.Header.Get("Authorization")
if token == "" {
http.Error(w, "Unauthorized", http.StatusUnauthorized)
return
}
// JWT token doğrula
claims, err := am.validateToken(token)
if err != nil {
http.Error(w, "Invalid token", http.StatusUnauthorized)
return
}
// Role-based authorization
if !am.hasPermission(claims.Role, r.URL.Path) {
http.Error(w, "Forbidden", http.StatusForbidden)
return
}
next(w, r)
}
}
func (am *AuthMiddleware) validateToken(tokenString string) (*Claims, error) {
token, err := jwt.ParseWithClaims(tokenString, &Claims{}, func(token *jwt.Token) (interface{}, error) {
return am.secretKey, nil
})
if err != nil {
return nil, err
}
if claims, ok := token.Claims.(*Claims); ok && token.Valid {
return claims, nil
}
return nil, fmt.Errorf("invalid token")
}
func (am *AuthMiddleware) hasPermission(role, path string) bool {
// Basit role-based authorization
// Production'da daha karmaşık bir sistem kullanılabilir
// Admin tüm endpoint'lere erişebilir
if role == "admin" {
return true
}
// User sadece read-only endpoint'lere erişebilir
if role == "user" {
return path == "/api/stats" || path == "/api/health"
}
// Viewer sadece preview endpoint'lerine erişebilir
if role == "viewer" {
return path == "/preview" || path == "/preview/list"
}
return false
}
type Claims struct {
Username string `json:"username"`
Role string `json:"role"`
jwt.RegisteredClaims
}
|
19.5 Container Security Best Practices
Docker container’ları için güvenlik önlemleri:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
# Non-root user kullan
RUN groupadd -r appuser && useradd -r -g appuser appuser
USER appuser
# Minimal base image kullan
FROM gcr.io/distroless/base-debian11
# Sadece gerekli port'ları expose et
EXPOSE 8080
# Read-only filesystem (mümkünse)
# docker run --read-only --tmpfs /tmp
# Security scanning
# docker scan <image-name>
|
Kubernetes Security Context:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
apiVersion: v1
kind: Pod
spec:
securityContext:
runAsNonRoot: true
runAsUser: 1000
fsGroup: 1000
seccompProfile:
type: RuntimeDefault
containers:
- name: video-processor
securityContext:
allowPrivilegeEscalation: false
readOnlyRootFilesystem: true
capabilities:
drop:
- ALL
|
19.6 Certificate Management
TLS sertifikalarının yönetimi:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
|
import (
"crypto/tls"
"crypto/x509"
"time"
)
type CertificateManager struct {
cert *tls.Certificate
certPool *x509.CertPool
mutex sync.RWMutex
}
func NewCertificateManager(certFile, keyFile, caFile string) (*CertificateManager, error) {
cert, err := tls.LoadX509KeyPair(certFile, keyFile)
if err != nil {
return nil, err
}
caCert, err := os.ReadFile(caFile)
if err != nil {
return nil, err
}
certPool := x509.NewCertPool()
if !certPool.AppendCertsFromPEM(caCert) {
return nil, fmt.Errorf("failed to parse CA certificate")
}
return &CertificateManager{
cert: &cert,
certPool: certPool,
}, nil
}
// Certificate expiration kontrolü
func (cm *CertificateManager) CheckExpiration() (time.Duration, error) {
cm.mutex.RLock()
defer cm.mutex.RUnlock()
cert, err := x509.ParseCertificate(cm.cert.Certificate[0])
if err != nil {
return 0, err
}
return time.Until(cert.NotAfter), nil
}
// Sertifika yenileme logic'i için placeholder (ör. ACME / Let's Encrypt)
func (cm *CertificateManager) renewCertificate() error {
// Gerçek bir sistemde ACME client implementasyonu veya harici bir certificate manager ile entegrasyon yapılır
return fmt.Errorf("certificate renewal not implemented")
}
// Auto-renewal için
func (cm *CertificateManager) StartAutoRenewal() {
go func() {
ticker := time.NewTicker(24 * time.Hour)
defer ticker.Stop()
for range ticker.C {
timeLeft, err := cm.CheckExpiration()
if err != nil {
continue
}
// 30 günden az kaldıysa yenile
if timeLeft < 30*24*time.Hour {
cm.renewCertificate()
}
}
}()
}
|
Frame data’yı işlemeden önce validate edin:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
import (
"errors"
"fmt"
)
func validateFrame(frame *Frame) error {
if frame == nil {
return errors.New("frame is nil")
}
if len(frame.Data) == 0 {
return errors.New("frame data is empty")
}
expectedSize := frame.Width * frame.Height * 3
if len(frame.Data) != expectedSize {
return fmt.Errorf("frame size mismatch: expected %d, got %d", expectedSize, len(frame.Data))
}
// Boyut limitleri
if frame.Width > 4096 || frame.Height > 4096 {
return errors.New("frame dimensions too large")
}
return nil
}
|
19.8 Security Monitoring ve Audit Logging
Güvenlik olaylarını loglama:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
type SecurityLogger struct {
logger *logrus.Logger
}
func (sl *SecurityLogger) LogSecurityEvent(eventType string, details map[string]interface{}) {
sl.logger.WithFields(logrus.Fields{
"event_type": eventType,
"timestamp": time.Now(),
"details": details,
}).Warn("Security event")
}
// Kullanım örnekleri:
// sl.LogSecurityEvent("authentication_failure", map[string]interface{}{
// "ip": "192.168.1.100",
// "username": "admin",
// })
// sl.LogSecurityEvent("unauthorized_access", map[string]interface{}{
// "endpoint": "/api/frames",
// "user": "user123",
// })
|
20. Tam Örnek Uygulama
Tüm parçaları bir araya getiren örnek:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
|
package main
import (
"context"
"log"
"os"
"os/signal"
"syscall"
"time"
)
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Signal handling
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigChan
log.Println("Shutting down...")
cancel()
}()
// Video kaynağını oluştur
source := NewVideoSource("rtsp://camera:554/stream", 640, 480)
// Pipeline oluştur
pipeline := NewPipeline(source)
// Motion detector ekle
motionDetector := NewMotionDetector(0.05, 640, 480) // %5 threshold
pipeline.AddProcessor(motionDetector)
// Object detector ekle (opsiyonel, hata olursa graceful degradation)
// objectDetector, err := NewObjectDetector("model.onnx", "config.yaml", "classes.txt")
// if err == nil {
// pipeline.AddProcessor(objectDetector)
// }
// Edge decision layer ekle
mqttPub, err := NewMQTTPublisher("tcp://mqtt:1883", "video/events", "video-processor")
if err != nil {
log.Fatalf("Failed to create MQTT publisher: %v", err)
}
decisionLayer := NewEdgeDecisionLayer(0.7, mqttPub, 5*time.Second)
pipeline.AddProcessor(decisionLayer)
// Pipeline'ı başlat (goroutine'de)
pipelineDone := make(chan error, 1)
go func() {
pipelineDone <- pipeline.Run(ctx)
}()
// Shutdown sinyalini bekle
<-ctx.Done()
log.Println("Shutting down gracefully...")
// Graceful shutdown için timeout
shutdownTimeout := 30 * time.Second
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), shutdownTimeout)
defer shutdownCancel()
// Pipeline'ı durdur (zorunlu timeout fallback ile)
shutdownDone := make(chan error, 1)
go func() {
shutdownDone <- pipeline.Shutdown(shutdownCtx)
}()
select {
case err := <-shutdownDone:
if err != nil {
log.Printf("Shutdown error: %v", err)
} else {
log.Println("Pipeline stopped successfully")
}
case <-time.After(30 * time.Second):
log.Println("Force killing after shutdown timeout!")
os.Exit(1)
}
// MQTT bağlantısını kapat (best effort)
if err := mqttPub.Close(); err != nil {
log.Printf("MQTT close error: %v", err)
}
// Pipeline'ın tamamlanmasını bekle
select {
case err := <-pipelineDone:
if err != nil {
log.Printf("Pipeline error: %v", err)
}
default:
}
log.Println("Application stopped")
}
|
21. Frame Recording ve Storage
Önemli event’lerde frame’leri kaydetmek için:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
type FrameRecorder struct {
storagePath string
maxFrames int
frames []*Frame
mutex sync.Mutex
}
func (fr *FrameRecorder) RecordFrame(frame *Frame) error {
fr.mutex.Lock()
defer fr.mutex.Unlock()
if len(fr.frames) >= fr.maxFrames {
// En eski frame'i kaldır
fr.frames = fr.frames[1:]
}
fr.frames = append(fr.frames, frame)
return nil
}
func (fr *FrameRecorder) SaveEvent(eventID string) error {
fr.mutex.Lock()
frames := make([]*Frame, len(fr.frames))
copy(frames, fr.frames)
fr.mutex.Unlock()
// Frame'leri JPEG olarak kaydet
for i, frame := range frames {
jpegData, err := FrameToJPEG(frame, 90)
if err != nil {
return err
}
filename := fmt.Sprintf("%s/%s_frame_%d.jpg", fr.storagePath, eventID, i)
if err := os.WriteFile(filename, jpegData, 0644); err != nil {
return err
}
}
return nil
}
|
22. Alerting ve Bildirimler
Event’ler için bildirim mekanizması:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
type AlertManager struct {
notifiers []Notifier
}
type Notifier interface {
Send(alert *Alert) error
}
type Alert struct {
Level string
Message string
Event *Event
Timestamp time.Time
}
// Email notifier
type EmailNotifier struct {
smtpServer string
recipients []string
}
// Slack notifier
type SlackNotifier struct {
webhookURL string
}
// SMS notifier (Twilio, vb.)
type SMSNotifier struct {
apiKey string
phone string
}
func (am *AlertManager) SendAlert(alert *Alert) {
for _, notifier := range am.notifiers {
go func(n Notifier) {
if err := n.Send(alert); err != nil {
log.Printf("Alert gönderilemedi: %v", err)
}
}(notifier)
}
}
|
23. Hardware Gereksinimleri
Minimum Gereksinimler (Motion Detection Only)
- CPU: 2 core, 1.5 GHz+
- RAM: 512 MB
- Storage: 1 GB (OS + uygulama)
- Network: 10 Mbps
Önerilen Gereksinimler (Object Detection)
- CPU: 4 core, 2.0 GHz+
- RAM: 2 GB
- Storage: 5 GB
- Network: 100 Mbps
- GPU (opsiyonel): NVIDIA Jetson Nano, Intel Neural Compute Stick
Production Gereksinimleri (Multi-Camera)
- CPU: 8+ core, 2.5 GHz+
- RAM: 8 GB+
- Storage: 50 GB+ (event recording için)
- Network: 1 Gbps
- GPU: NVIDIA Jetson Xavier NX veya dedicated AI accelerator
24. Gerçek Dünya Deneyimleri ve Karşılaşılan Zorluklar
Bu projeyi geliştirirken karşılaştığımız bazı zorluklar ve çözümler:
24.1 FFmpeg Process Yönetimi
Problem: FFmpeg process’i beklenmedik şekilde crash olabiliyor veya bağlantı kopabiliyor.
Çözüm: Sürekli monitoring ve otomatik restart mekanizması:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
func (vs *VideoSource) StartWithMonitoring(ctx context.Context) error {
for {
if err := vs.Start(ctx); err != nil {
log.Printf("FFmpeg başlatılamadı: %v, 5 saniye sonra tekrar deneniyor...", err)
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(5 * time.Second):
continue
}
}
// Process'in çalışıp çalışmadığını kontrol et
select {
case <-ctx.Done():
return ctx.Err()
case err := <-vs.Errors():
log.Printf("FFmpeg hatası: %v, yeniden başlatılıyor...", err)
vs.Stop()
time.Sleep(2 * time.Second)
}
}
}
|
24.2 Memory Leak’ler
Problem: Frame buffer’ları düzgün temizlenmediğinde bellek kullanımı sürekli artıyor.
Çözüm: sync.Pool kullanımı ve düzenli GC çağrıları:
1
2
3
4
5
6
7
8
9
|
// Her 1000 frame'de bir GC çağır
frameCount := 0
for frame := range frames {
frameCount++
if frameCount%1000 == 0 {
runtime.GC()
}
// Frame işleme...
}
|
24.3 Network Bağlantı Sorunları
Problem: RTSP bağlantıları zaman zaman kopuyor, özellikle WiFi üzerinden.
Çözüm: TCP transport kullanımı ve connection timeout ayarları:
1
2
|
# UDP yerine TCP kullan (daha güvenilir ama biraz daha yavaş)
ffmpeg -rtsp_transport tcp -i rtsp://...
|
24.4 Frame Rate Tutarsızlıkları
Problem: Kaynak 30 FPS sağlıyor ama pipeline sadece 10-15 FPS işleyebiliyor.
Çözüm: Frame skipping ve adaptive processing:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
type AdaptiveProcessor struct {
targetFPS int
currentFPS float64
skipRatio int
lastFrameTime time.Time
}
func (ap *AdaptiveProcessor) ShouldProcess() bool {
now := time.Now()
if ap.lastFrameTime.IsZero() {
ap.lastFrameTime = now
return true
}
elapsed := now.Sub(ap.lastFrameTime)
currentFPS := 1.0 / elapsed.Seconds()
if currentFPS > float64(ap.targetFPS)*1.2 {
// Çok hızlı, bazı frame'leri atla
ap.skipRatio++
return ap.skipRatio%2 == 0
}
ap.skipRatio = 0
ap.lastFrameTime = now
return true
}
|
24.5 Object Detection Gecikmeleri
Problem: Object detection model’i çok yavaş, pipeline’ı tıkıyor.
Çözüm: Async processing ve frame buffering:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
func (od *ObjectDetector) ProcessAsync(ctx context.Context, in <-chan *Frame, out chan<- *Frame) {
// Detection'ları async olarak çalıştır
detectionResults := make(map[int]*Frame)
resultMutex := sync.RWMutex{}
frameID := 0
for frame := range in {
currentID := frameID
frameID++
// Frame'i hemen geçir (detection olmadan)
select {
case out <- frame:
case <-ctx.Done():
return
}
// Detection'ı arka planda yap
go func(f *Frame, id int) {
detections := od.detect(f)
resultMutex.Lock()
detectionResults[id] = f
f.Metadata["detections"] = detections
resultMutex.Unlock()
}(frame, currentID)
}
}
|
24.6 Multi-Camera Yönetimi
Problem: Birden fazla kameradan gelen stream’leri yönetmek karmaşık.
Çözüm: Camera manager pattern:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
type CameraManager struct {
cameras map[string]*VideoSource
pipelines map[string]*Pipeline
mutex sync.RWMutex
}
func (cm *CameraManager) AddCamera(id, url string, width, height int) error {
cm.mutex.Lock()
defer cm.mutex.Unlock()
source := NewVideoSource(url, width, height)
pipeline := NewPipeline(source)
cm.cameras[id] = source
cm.pipelines[id] = pipeline
return pipeline.Run(context.Background())
}
func (cm *CameraManager) RemoveCamera(id string) error {
cm.mutex.Lock()
defer cm.mutex.Unlock()
if pipeline, exists := cm.pipelines[id]; exists {
pipeline.Stop()
delete(cm.pipelines, id)
}
if source, exists := cm.cameras[id]; exists {
source.Stop()
delete(cm.cameras, id)
}
return nil
}
|
25. Troubleshooting Rehberi
25.1 FFmpeg Bağlantı Hatası
Problem: Connection refused veya Connection timeout
Çözüm:
- RTSP URL’ini kontrol edin:
ffmpeg -rtsp_transport tcp -i <url> -t 1 -f null -
- Firewall kurallarını kontrol edin
- Kamera ayarlarında RTSP port’unun açık olduğundan emin olun
- TCP transport kullanmayı deneyin (UDP yerine)
25.2 Yüksek CPU Kullanımı
Problem: CPU %100’e yakın, frame rate düşüyor
Çözüm:
- Frame rate’i düşürün (target FPS ayarı)
- Çözünürlüğü azaltın (640x480 → 320x240)
- Object detection’ı devre dışı bırakın veya async yapın
- Daha güçlü hardware kullanın
25.3 Memory Leak
Problem: Bellek kullanımı sürekli artıyor
Çözüm:
sync.Pool kullanımını kontrol edin
- Frame buffer’ların düzgün temizlendiğinden emin olun
- Periyodik GC çağrıları ekleyin
pprof ile memory profile alın
26. Maliyet Analizi
26.1 Cloud vs Edge Processing Karşılaştırması
Cloud Video Processing (AWS Rekognition, Azure Video Analyzer):
- Video başına: ~$0.10-0.50/dakika
- 10 kamera, 24/7: ~$1,440-7,200/ay
- Network maliyeti: ~$200-500/ay
- Toplam: ~$1,640-7,700/ay
Edge Processing (Bu Sistem):
- Edge device: ~$50-200 (bir kerelik)
- Cloud storage (sadece event’ler): ~$10-50/ay
- Network (sadece event’ler): ~$5-20/ay
- Toplam: ~$15-70/ay + bir kerelik hardware
Tasarruf: %95-99 maliyet azalması (hardware maliyeti hariç)
27. Production Best Practices
27.1 Resource Limits
Docker/Kubernetes’te resource limit’leri mutlaka ayarlayın:
1
2
3
4
5
6
7
|
resources:
requests:
memory: "512Mi"
cpu: "1000m"
limits:
memory: "1Gi"
cpu: "2000m"
|
27.2 Graceful Shutdown
Sistem kapanırken tüm goroutine’lerin düzgün şekilde sonlanmasını sağlayın:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
func (p *Pipeline) Shutdown(ctx context.Context) error {
// Timeout / cancellation için dışarıdan verilen context'i kullan
done := make(chan error, 1)
go func() {
done <- p.Stop()
}()
select {
case err := <-done:
return err
case <-ctx.Done():
return ctx.Err()
}
}
|
27.3 Circuit Breaker Pattern
External servislere (object detection API) bağlanırken circuit breaker kullanın:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
import "github.com/sony/gobreaker"
type CircuitBreakerDetector struct {
detector *ExternalDetector
cb *gobreaker.CircuitBreaker
}
func NewCircuitBreakerDetector(detector *ExternalDetector) *CircuitBreakerDetector {
cb := gobreaker.NewCircuitBreaker(gobreaker.Settings{
Name: "ObjectDetection",
Timeout: 5 * time.Second,
ReadyToTrip: func(counts gobreaker.Counts) bool {
return counts.ConsecutiveFailures > 5
},
})
return &CircuitBreakerDetector{
detector: detector,
cb: cb,
}
}
func (cbd *CircuitBreakerDetector) Detect(frame *Frame) ([]Detection, error) {
result, err := cbd.cb.Execute(func() (interface{}, error) {
return cbd.detector.callInferenceService(frame)
})
if err != nil {
return nil, err
}
return result.([]Detection), nil
}
|
27.4 Rate Limiting
Event gönderiminde rate limiting kullanın:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
import "golang.org/x/time/rate"
type RateLimitedPublisher struct {
publisher EventPublisher
limiter *rate.Limiter
}
func NewRateLimitedPublisher(publisher EventPublisher, eventsPerSecond int) *RateLimitedPublisher {
return &RateLimitedPublisher{
publisher: publisher,
limiter: rate.NewLimiter(rate.Limit(eventsPerSecond), eventsPerSecond),
}
}
func (rlp *RateLimitedPublisher) Publish(event *Event) error {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
if err := rlp.limiter.Wait(ctx); err != nil {
return fmt.Errorf("rate limit exceeded: %w", err)
}
return rlp.publisher.Publish(event)
}
|
27.5 Structured Logging
Tüm log’ları structured format’ta tutun:
1
2
3
4
5
6
7
8
|
logger.WithFields(logrus.Fields{
"camera_id": cameraID,
"frame_id": frameID,
"event_type": event.Type,
"confidence": event.Confidence,
"processing_ms": processingTime.Milliseconds(),
"timestamp": time.Now().Unix(),
}).Info("Event detected")
|
27.6 Metrics Collection
Önemli metrikleri toplayın ve export edin:
- Frame processing rate (FPS)
- Detection accuracy
- Error rates
- Memory usage
- CPU usage
- Network latency
- Event publish success rate
27.7 Configuration Management (Viper)
Sensitive bilgileri environment variable veya secret management’tan alın:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
import "github.com/spf13/viper"
func LoadConfig() (*Config, error) {
viper.SetConfigName("config")
viper.SetConfigType("yaml")
viper.AddConfigPath(".")
viper.AddConfigPath("/etc/video-processor/")
// Environment variable'ları override edebilir
viper.SetEnvPrefix("VIDEO")
viper.AutomaticEnv()
if err := viper.ReadInConfig(); err != nil {
return nil, err
}
var config Config
if err := viper.Unmarshal(&config); err != nil {
return nil, err
}
return &config, nil
}
|
28. Alternatif Yaklaşımlar ve Trade-off’lar
28.1 FFmpeg vs GStreamer
FFmpeg:
- ✅ Daha yaygın, iyi dokümante edilmiş
- ✅ Çok sayıda format desteği
- ❌ Process overhead (her frame için pipe I/O)
- ❌ Biraz daha yavaş
GStreamer:
- ✅ Daha optimize, daha hızlı
- ✅ Plugin mimarisi
- ❌ Daha az dokümante
- ❌ Kurulum daha karmaşık
Karar: FFmpeg’i seçtik çünkü daha yaygın ve dokümante edilmiş. GStreamer daha performanslı olabilir ama FFmpeg bizim ihtiyaçlarımız için yeterli.
28.2 GoCV vs External Service
GoCV (In-process):
- ✅ Düşük latency
- ✅ Network bağımlılığı yok
- ❌ Model binary’ye dahil (büyük binary)
- ❌ CPU kullanımı yüksek
External Service:
- ✅ Model’i ayrı serviste çalıştırma (GPU kullanımı kolay)
- ✅ Daha küçük binary
- ❌ Network latency
- ❌ Ek servis yönetimi
Karar: Küçük modeller için GoCV, büyük modeller için external service kullanıyoruz.
28.3 MQTT vs Kafka vs Webhook
MQTT:
- ✅ Hafif, edge için ideal
- ✅ Low latency
- ✅ QoS desteği
- ❌ Message ordering garantisi yok
Kafka:
- ✅ Yüksek throughput
- ✅ Message ordering
- ❌ Daha ağır, edge için fazla
- ❌ Daha yüksek latency
Webhook:
- ✅ Basit HTTP
- ✅ Kolay entegrasyon
- ❌ Retry mekanizması yok
- ❌ Network bağımlılığı
Karar: Edge’den cloud’a event gönderimi için MQTT, yüksek throughput gerektiğinde Kafka kullanıyoruz.
28.4 FFmpeg Yerine Alternatif Yaklaşımlar
FFmpeg haricinde Go tabanlı video işleme kütüphaneleri de mevcuttur:
github.com/3d0c/gmf (Go Media Framework):
- ✅ Pure Go implementasyonu
- ✅ FFmpeg bağımlılığı yok
- ❌ Format desteği sınırlı
- ❌ Aktif geliştirme yavaş
github.com/asticode/go-astits:
- ✅ MPEG-TS stream parsing
- ✅ Pure Go
- ❌ Sadece MPEG-TS formatı
Neden FFmpeg Tercih Edildi?
- Format desteği: 100+ video/audio formatı
- Stabilite: Yıllardır production’da kullanılıyor
- Performans: C ile yazılmış, optimize edilmiş
- Dokümantasyon: Kapsamlı ve güncel
- Topluluk desteği: Geniş kullanıcı tabanı
Karar: FFmpeg’i seçtik çünkü format desteği, stabilite ve performans kritikti. Go tabanlı alternatifler gelecekte daha olgunlaşabilir.
29. Machine Learning Model Deployment
Edge processing sistemlerinde ML model’lerinin deployment’ı kritik bir konudur. Model versioning, A/B testing ve canary deployment stratejileri:
29.1 Model Versioning
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
|
type ModelManager struct {
models map[string]*Model
activeModel string
mutex sync.RWMutex
}
type Model struct {
Version string
Path string
LoadedAt time.Time
Performance ModelMetrics
}
func (mm *ModelManager) LoadModel(version, path string) error {
mm.mutex.Lock()
defer mm.mutex.Unlock()
// Model'i yükle
model := &Model{
Version: version,
Path: path,
LoadedAt: time.Now(),
}
mm.models[version] = model
// İlk model ise aktif yap
if mm.activeModel == "" {
mm.activeModel = version
}
return nil
}
func (mm *ModelManager) SwitchModel(version string) error {
mm.mutex.Lock()
defer mm.mutex.Unlock()
if _, exists := mm.models[version]; !exists {
return fmt.Errorf("model version %s not found", version)
}
mm.activeModel = version
return nil
}
|
29.2 A/B Testing
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
|
import (
"fmt"
"math/rand"
"sync"
"time"
)
type ABTester struct {
modelA *Model
modelB *Model
splitRatio float64 // 0.0-1.0, modelA için
results map[string]ABResult
mutex sync.RWMutex
}
type ABResult struct {
ModelAAccuracy float64
ModelBAccuracy float64
ModelALatency time.Duration
ModelBLatency time.Duration
}
func (ab *ABTester) Predict(frame *Frame) ([]Detection, error) {
// Rastgele model seç (split ratio'ya göre)
useModelA := rand.Float64() < ab.splitRatio
var model *Model
var modelName string
if useModelA {
model = ab.modelA
modelName = "A"
} else {
model = ab.modelB
modelName = "B"
}
start := time.Now()
detections, err := model.Predict(frame)
latency := time.Since(start)
// Sonuçları kaydet
ab.recordResult(modelName, detections, latency)
return detections, err
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
type ModelMetrics struct {
TotalInferences int64
AverageLatency time.Duration
Accuracy float64
FalsePositives int64
FalseNegatives int64
mutex sync.RWMutex
}
func (mm *ModelMetrics) RecordInference(latency time.Duration, correct bool) {
mm.mutex.Lock()
defer mm.mutex.Unlock()
mm.TotalInferences++
// Moving average latency
if mm.AverageLatency == 0 {
mm.AverageLatency = latency
} else {
mm.AverageLatency = (mm.AverageLatency + latency) / 2
}
// Accuracy hesapla
if correct {
mm.Accuracy = float64(mm.TotalInferences-mm.FalsePositives-mm.FalseNegatives) / float64(mm.TotalInferences)
}
}
|
29.4 Donanım Hızlandırma ve GPU Desteği
Edge AI cihazlarıyla entegrasyon için GPU desteği kritik öneme sahiptir:
NVIDIA Jetson Entegrasyonu
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
import "gocv.io/x/gocv"
func NewGPUAcceleratedDetector(modelPath string) (*ObjectDetector, error) {
net := gocv.ReadNet(modelPath, "")
if net.Empty() {
return nil, fmt.Errorf("failed to load model")
}
// CUDA backend kullan
net.SetPreferableBackend(gocv.NetBackendCUDA)
net.SetPreferableTarget(gocv.NetTargetCUDA)
return &ObjectDetector{net: net}, nil
}
|
Intel Neural Compute Stick (OpenVINO)
1
2
3
|
// OpenVINO backend ile
net.SetPreferableBackend(gocv.NetBackendOpenVINO)
net.SetPreferableTarget(gocv.NetTargetVPU) // Intel NCS için
|
CUDA, OpenCL, Vulkan Karşılaştırması
| API |
Platform |
Performans |
Edge Uygunluğu |
| CUDA |
NVIDIA |
⭐⭐⭐⭐⭐ |
Jetson serisi |
| OpenCL |
Cross-platform |
⭐⭐⭐ |
Genel GPU’lar |
| Vulkan |
Cross-platform |
⭐⭐⭐⭐ |
Modern GPU’lar |
Öneri: NVIDIA Jetson için CUDA, genel kullanım için OpenVINO tercih edilir.
30. Video Quality Metrics
Video kalitesini ölçmek için çeşitli metrikler kullanılabilir:
30.1 PSNR (Peak Signal-to-Noise Ratio)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
import "math"
func CalculatePSNR(original, processed *Frame) float64 {
mse := calculateMSE(original, processed)
if mse == 0 {
return math.Inf(1) // Perfect match
}
maxPixelValue := 255.0
psnr := 20 * math.Log10(maxPixelValue/math.Sqrt(mse))
return psnr
}
func calculateMSE(original, processed *Frame) float64 {
if len(original.Data) != len(processed.Data) {
return math.MaxFloat64
}
var sum float64
for i := 0; i < len(original.Data); i++ {
diff := float64(original.Data[i]) - float64(processed.Data[i])
sum += diff * diff
}
return sum / float64(len(original.Data))
}
|
30.2 SSIM (Structural Similarity Index)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
|
import "math"
func CalculateSSIM(original, processed *Frame) float64 {
// Basitleştirilmiş SSIM implementasyonu
// Tam implementasyon için luminance, contrast, structure hesaplamaları gerekir
mu1 := calculateMean(original)
mu2 := calculateMean(processed)
sigma1 := calculateStdDev(original, mu1)
sigma2 := calculateStdDev(processed, mu2)
sigma12 := calculateCovariance(original, processed, mu1, mu2)
c1 := 0.01 * 255 * 0.01 * 255
c2 := 0.03 * 255 * 0.03 * 255
numerator := (2*mu1*mu2 + c1) * (2*sigma12 + c2)
denominator := (mu1*mu1 + mu2*mu2 + c1) * (sigma1*sigma1 + sigma2*sigma2 + c2)
return numerator / denominator
}
func calculateMean(frame *Frame) float64 {
var sum float64
for _, v := range frame.Data {
sum += float64(v)
}
return sum / float64(len(frame.Data))
}
func calculateStdDev(frame *Frame, mean float64) float64 {
var sum float64
for _, v := range frame.Data {
diff := float64(v) - mean
sum += diff * diff
}
return math.Sqrt(sum / float64(len(frame.Data)))
}
func calculateCovariance(f1, f2 *Frame, mean1, mean2 float64) float64 {
var sum float64
minLen := len(f1.Data)
if len(f2.Data) < minLen {
minLen = len(f2.Data)
}
for i := 0; i < minLen; i++ {
sum += (float64(f1.Data[i]) - mean1) * (float64(f2.Data[i]) - mean2)
}
return sum / float64(minLen)
}
|
31. Load Balancing ve High Availability
Production ortamlarında yüksek erişilebilirlik için load balancing ve failover mekanizmaları:
31.1 Multi-Instance Deployment
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
type LoadBalancer struct {
instances []*VideoProcessor
current int
mutex sync.Mutex
}
func (lb *LoadBalancer) ProcessFrame(frame *Frame) error {
lb.mutex.Lock()
instance := lb.instances[lb.current]
lb.current = (lb.current + 1) % len(lb.instances)
lb.mutex.Unlock()
return instance.Process(frame)
}
func (lb *LoadBalancer) HealthCheck() {
lb.mutex.Lock()
defer lb.mutex.Unlock()
for i, instance := range lb.instances {
if !instance.IsHealthy() {
// Sağlıksız instance'ı listeden çıkar
lb.removeInstance(i)
}
}
}
func (lb *LoadBalancer) removeInstance(index int) {
if index < 0 || index >= len(lb.instances) {
return
}
// Instance'ı listeden çıkar
lb.instances = append(lb.instances[:index], lb.instances[index+1:]...)
// Current index'i güncelle
if lb.current >= len(lb.instances) {
lb.current = 0
}
}
|
31.2 Failover Mechanism
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
type FailoverManager struct {
primary *VideoProcessor
secondary *VideoProcessor
active *VideoProcessor
mutex sync.RWMutex
}
func (fm *FailoverManager) ProcessFrame(frame *Frame) error {
fm.mutex.RLock()
active := fm.active
fm.mutex.RUnlock()
err := active.Process(frame)
if err != nil {
// Failover
fm.switchToSecondary()
return fm.active.Process(frame)
}
return nil
}
func (fm *FailoverManager) switchToSecondary() {
fm.mutex.Lock()
defer fm.mutex.Unlock()
if fm.active == fm.primary {
fm.active = fm.secondary
} else {
fm.active = fm.primary
}
}
|
32. Sonuç
Bu makalede, production ortamında kullanılan bir gerçek zamanlı video analiz sisteminin temel bileşenlerini ve mimarisini paylaştık. Go ile edge processing yaklaşımı, yüksek performanslı, ölçeklenebilir ve maliyet etkin sistemler geliştirmeyi mükemmel bir şekilde mümkün kılar. Go’nun goroutine modeli, video işleme pipeline’ları için ideal bir yapı sunar ve channel’lar üzerinden frame akışını kolayca yönetmemizi sağlar.
32.1 Proje Deneyimlerimiz
Bu sistemi geliştirirken ve production’da kullanırken edindiğimiz temel deneyimler:
- Modüler mimari kritik - Her bileşenin bağımsız çalışabilmesi, sistemin bakımını ve geliştirmesini kolaylaştırıyor
- Error handling ve resilience - Production’da her şey yolunda gitmeyebilir, bu yüzden graceful degradation ve retry mekanizmaları şart
- Monitoring olmadan kör uçuyorsunuz - Sistemin sağlığını ve performansını sürekli izlemek gerekiyor
- Edge processing gerçekten maliyet tasarrufu sağlıyor - Cloud processing’e göre %95+ tasarruf mümkün
- Go’nun eşzamanlılık modeli video işleme için ideal - Goroutine ve channel yapısı, pipeline’ları yönetmeyi çok kolaylaştırıyor
- C’den Go’ya geçiş deneyimi - C ile yazılmış sistemin Go ile yeniden yazılması, hem performans hem de geliştirici deneyimi açısından önemli iyileştirmeler sağladı
32.2 Özet
- Edge processing yaklaşımı, gecikmeyi azaltır ve bant genişliği kullanımını optimize eder
- Go’nun eşzamanlılık modeli, video pipeline’larını verimli bir şekilde yönetmemizi sağlar
- Modüler mimari, farklı analiz tekniklerini kolayca ekleyip çıkarabilmemizi sağlar
- Graceful degradation ve error handling, production ortamlarında güvenilir sistemler kurmamıza olanak tanır
- ROI detection ve network optimizasyonu gibi teknikler, sistem performansını önemli ölçüde artırır
- Machine learning model deployment stratejileri, sistemin sürekli gelişmesini sağlar
32.3 Sonraki Adımlar
- GPU desteği - NVIDIA Jetson, Intel Neural Compute Stick gibi edge AI cihazlarıyla entegrasyon (Bölüm 29.4’te başlangıç yapıldı)
- Distributed processing - Birden fazla kamera için distributed edge processing
- Real-time streaming - Analiz edilmiş frame’leri gerçek zamanlı olarak stream etme
- Advanced analytics - Yüz tanıma, davranış analizi, tracking algoritmaları
- Edge-to-cloud sync - Analiz sonuçlarının cloud’da birleştirilmesi ve raporlama
- WebRTC entegrasyonu - Browser tabanlı real-time preview ve kontrol
- Advanced compression - AV1 codec desteği ve adaptive compression
32.4 İleri Seviye Konular (Ayrı Makale Konuları)
Bu makale temel ve orta seviye konuları kapsamaktadır. Aşağıdaki konular için ayrı makaleler önerilir:
Dağıtık Edge Processing:
- Leader election ve consensus algoritmaları
- Edge-cloud senkronizasyonu (eventual consistency)
- Multi-edge device koordinasyonu
CI/CD ve GitOps:
- Multi-architecture Docker image’ları (arm/v7, arm64, amd64)
- ArgoCD, Flux ile edge deployment
- Automated testing ve deployment pipelines
Monitoring & Observability:
- Distributed tracing (Jaeger, OpenTelemetry)
- Anomali tespiti için metrik analiz
- Advanced alerting stratejileri
Güvenlik Derinlemesine:
- SRTP, DTLS video stream şifreleme
- ONVIF, digest auth kamera kimlik doğrulama
- Container güvenliği (AppArmor, Seccomp)
Yasal ve Etik:
- GDPR, KVKK uyumluluğu
- Video kayıt saklama ve imha politikaları
- Privacy-by-design yaklaşımları
Performance Optimization:
- Model quantization, pruning, distillation
- Hardware codec entegrasyonu (VA-API, VDPAU, NVDEC)
- Memory management optimizasyonları (HugeTLB, alignment)
Network Protocols:
- SRT (Secure Reliable Transport)
- QUIC protokolü üzerinden streaming
- Time-Sensitive Networking (TSN) endüstriyel ortamlar için
32.4 Kod Örnekleri ve Repository
Bu makalede paylaşılan kod örnekleri, production ortamında kullanılan bir sistemin temel bileşenlerinden oluşmaktadır. Tam implementasyon ve ek özellikler için:
Not: Kod örnekleri ve tam implementasyon, proje gereksinimlerine göre özelleştirilmiştir. Bu makaledeki örnekler, sistemin temel mimarisini ve yaklaşımını göstermektedir.
32.6 Öğrenme Kaynakları
32.7 Sonuç ve Öneriler
Bu sistem, production ortamında başarıyla çalışmaktadır ve aşağıdaki avantajları sağlamıştır:
- %95+ maliyet tasarrufu - Cloud processing yerine edge processing
- <100ms latency - Gerçek zamanlı karar verme
- Ölçeklenebilir mimari - Her kamera bağımsız çalışır
- Güvenilir sistem - Graceful degradation ve error recovery
Başlangıç için öneriler:
- Önce tek kamera ile başlayın
- Motion detection ile test edin
- Object detection’ı sonra ekleyin
- Production’a geçmeden önce load test yapın
- Monitoring ve alerting’i mutlaka kurun
Not: Bu makale teknik bir rehber olarak hazırlanmıştır. Üretim ortamında kullanmadan önce kapsamlı test yapılması önerilir.