Siber güvenlik alanında makine öğrenmesi, özellikle anomali tespiti için güçlü bir araç haline geldi. Bu yazıda, ağ trafiğinde ve sistem davranışlarında anormallikleri tespit etmek için kullanılan ML tekniklerini inceleyeceğiz.

Anomali Tespiti Nedir?

Anomali tespiti, normal davranış kalıplarından sapmaları belirleme sürecidir. Siber güvenlikte bu, potansiyel tehditleri erken aşamada tespit etmemizi sağlar.

Veri Hazırlığı


import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split

# Veri yükleme
def load_network_data(file_path):
    """Ağ trafiği verilerini yükle ve ön işle"""
    data = pd.read_csv(file_path)
    
    # Özellik mühendisliği
    data['packet_rate'] = data['packets'] / data['duration']
    data['byte_rate'] = data['bytes'] / data['duration']
    
    return data

# Örnek veri oluşturma (gerçek uygulamada kendi verilerinizi yükleyeceksiniz)
data = pd.DataFrame(np.random.rand(100, 5), columns=['packets', 'bytes', 'duration', 'src_ip', 'dst_ip'])
data['packet_rate'] = data['packets'] / data['duration']
data['byte_rate'] = data['bytes'] / data['duration']

features = data[['packet_rate', 'byte_rate', 'packets', 'bytes', 'duration']]

# Özellikleri normalize et
scaler = StandardScaler()
features_normalized = scaler.fit_transform(features)

# Eğitim ve test setlerine ayırma (gözetimsiz öğrenme için y_train/y_test genellikle kullanılmaz, ama tutarlılık için eklenmiştir)
X_train, X_test, _, _ = train_test_split(features_normalized, np.zeros(len(features_normalized)), test_size=0.3, random_state=42)

Unsupervised Learning Yaklaşımları

1. Isolation Forest

Isolation Forest, anomalileri izole etmenin normal verilere göre daha kolay olduğu prensibine dayanır:


import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.ensemble import IsolationForest
from sklearn.neighbors import LocalOutlierFactor

# Veri yükleme
def load_network_data(file_path):
    """Ağ trafiği verilerini yükle ve ön işle"""
    data = pd.read_csv(file_path)
    
    # Özellik mühendisliği
    data['packet_rate'] = data['packets'] / data['duration']
    data['byte_rate'] = data['bytes'] / data['duration']
    
    return data

# Örnek veri oluşturma (gerçek uygulamada kendi verilerinizi yükleyeceksiniz)
data = pd.DataFrame(np.random.rand(100, 5), columns=['packets', 'bytes', 'duration', 'src_ip', 'dst_ip'])
data['packet_rate'] = data['packets'] / data['duration']
data['byte_rate'] = data['bytes'] / data['duration']

features = data[['packet_rate', 'byte_rate', 'packets', 'bytes', 'duration']]

# Özellikleri normalize et
scaler = StandardScaler()
features_normalized = scaler.fit_transform(features)

# Eğitim ve test setlerine ayırma (gözetimsiz öğrenme için y_train/y_test genellikle kullanılmaz, ama tutarlılık için eklenmiştir)
X_train, X_test, _, _ = train_test_split(features_normalized, np.zeros(len(features_normalized)), test_size=0.3, random_state=42)

# Model oluşturma
iso_forest = IsolationForest(
    contamination=0.1,  # Beklenen anomali oranı
    random_state=42,
    n_estimators=100
)

# Modeli eğit
iso_forest.fit(X_train)

# Tahmin yap (-1: anomali, 1: normal)
predictions_iso = iso_forest.predict(X_test)
anomaly_scores_iso = iso_forest.decision_function(X_test)

2. Local Outlier Factor (LOF) ile Anomali Tespiti

LOF, bir veri noktasının komşularına göre ne kadar izole olduğunu ölçer. Yoğunluk tabanlı bir anomali tespit algoritmasıdır:


import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.ensemble import IsolationForest
from sklearn.neighbors import LocalOutlierFactor

# Veri yükleme
def load_network_data(file_path):
    """Ağ trafiği verilerini yükle ve ön işle"""
    data = pd.read_csv(file_path)
    
    # Özellik mühendisliği
    data['packet_rate'] = data['packets'] / data['duration']
    data['byte_rate'] = data['bytes'] / data['duration']
    
    return data

# Örnek veri oluşturma (gerçek uygulamada kendi verilerinizi yükleyeceksiniz)
data = pd.DataFrame(np.random.rand(100, 5), columns=['packets', 'bytes', 'duration', 'src_ip', 'dst_ip'])
data['packet_rate'] = data['packets'] / data['duration']
data['byte_rate'] = data['bytes'] / data['duration']

features = data[['packet_rate', 'byte_rate', 'packets', 'bytes', 'duration']]

# Özellikleri normalize et
scaler = StandardScaler()
features_normalized = scaler.fit_transform(features)

# Eğitim ve test setlerine ayırma (gözetimsiz öğrenme için y_train/y_test genellikle kullanılmaz, ama tutarlılık için eklenmiştir)
X_train, X_test, _, _ = train_test_split(features_normalized, np.zeros(len(features_normalized)), test_size=0.3, random_state=42)

# Model oluşturma
lof = LocalOutlierFactor(
    n_neighbors=20,  # Komşu sayısı
    contamination=0.1,  # Beklenen anomali oranı
    novelty=True # Yeni veri üzerinde tahmin yapmak için True olmalı
)

# Modeli eğit (fit_predict kullanmayın, novelty=True olduğunda fit ve predict ayrı kullanılır)
lof.fit(X_train)

# Tahmin yap (-1: anomali, 1: normal)
predictions_lof = lof.predict(X_test)
anomaly_scores_lof = lof.decision_function(X_test) # LOF için decision_function değeri negatif, daha düşük değerler daha anormaldir.

One-Class SVM

Normal verilerin etrafında bir sınır oluşturur:


from sklearn.svm import OneClassSVM

# One-Class SVM modeli
oc_svm = OneClassSVM(
    kernel='rbf',
    gamma='scale',
    nu=0.1  # Anomali oranı üst sınırı
)

oc_svm.fit(X_train)
predictions_ocsvm = oc_svm.predict(X_test)

Gerçek Zamanlı Anomali Tespiti

Streaming veri için online learning yaklaşımı:


import river
from river import anomaly
from river import preprocessing

class RealTimeAnomalyDetector:
    def __init__(self):
        self.scaler = preprocessing.StandardScaler()
        self.model = anomaly.HalfSpaceTrees(
            n_trees=10,
            height=8,
            window_size=250,
            seed=42
        )
    
    def update_and_predict(self, x):
        # Veriyi normalize et
        x_scaled = self.scaler.learn_one(x).transform_one(x)
        
        # Anomali skoru
        score = self.model.score_one(x_scaled)
        
        # Modeli güncelle
        self.model.learn_one(x_scaled)
        
        return score

# Kullanım
detector = RealTimeAnomalyDetector()
# Gerçek zamanlı veri akışı (örnek bir liste olarak gösterilmiştir)
stream = [{"feature1": 0.5, "feature2": 0.2}, {"feature1": 0.6, "feature2": 0.3}, {"feature1": 0.1, "feature2": 0.9}] # Örnek veri
threshold = 0.7  # Anomali eşik değeri
def alert(message): print(message) # Basit bir alarm fonksiyonu
for data_point in stream:
    anomaly_score = detector.update_and_predict(data_point)
    if anomaly_score > threshold:
        alert("Anomali tespit edildi!")

Özellik Seçimi ve Boyut İndirgeme


from sklearn.decomposition import PCA

# PCA ile boyut indirgeme
pca = PCA(n_components=0.95)  # %95 varyansı koru
X_reduced = pca.fit_transform(X_train)

print(f"Orijinal boyut: {X_train.shape[1]}")
print(f"İndirgenmiş boyut: {X_reduced.shape[1]}")

Model Değerlendirme


from sklearn.metrics import classification_report, roc_auc_score
import matplotlib.pyplot as plt

def evaluate_anomaly_detector(y_true, y_pred, scores):
    """Anomali tespit modelini değerlendir"""
    
    # Sınıflandırma raporu
    print(classification_report(y_true, y_pred))
    
    # ROC AUC
    roc_auc = roc_auc_score(y_true, scores)
    print(f"ROC AUC Score: {roc_auc:.4f}")
    
    # Skor dağılımı
    plt.figure(figsize=(10, 6))
    plt.hist(scores[y_true == 0], bins=50, alpha=0.7, label='Normal')
    plt.hist(scores[y_true == 1], bins=50, alpha=0.7, label='Anomali')
    plt.xlabel('Anomali Skoru')
    plt.ylabel('Frekans')
    plt.legend()
    plt.title('Anomali Skorlarının Dağılımı')
    plt.show()

Hibrit Yaklaşım: Ensemble Modeller


class EnsembleAnomalyDetector:
    def __init__(self):
        self.models = {
            'isolation_forest': IsolationForest(contamination=0.1),
            'one_class_svm': OneClassSVM(nu=0.1),
            'lof': LocalOutlierFactor(novelty=True, contamination=0.1)
        }
        self.weights = {'isolation_forest': 0.4, 'one_class_svm': 0.3, 'lof': 0.3}
    
    def fit(self, X):
        for name, model in self.models.items():
            model.fit(X)
    
    def predict(self, X):
        scores = {}
        for name, model in self.models.items():
            if hasattr(model, 'decision_function'):
                scores[name] = model.decision_function(X)
            else:
                scores[name] = model.score_samples(X)
        
        # Not: Farklı modellerin skorları farklı ölçeklerde olabilir.
        # Gerçek bir uygulamada, bu skorları birleştirmeden önce normalleştirmek faydalı olacaktır.
        # Örneğin, Min-Max Scaler veya StandardScaler kullanılabilir.
        # Ağırlıklı ortalama
        final_scores = np.zeros(len(X))
        for name, score in scores.items():
            final_scores += self.weights[name] * score
        
        # Eşik değere göre sınıflandırma
        threshold = np.percentile(final_scores, 10)
        return (final_scores < threshold).astype(int)

Güvenlik İpuçları

  1. Feature Engineering: Domain bilgisini kullanarak anlamlı özellikler çıkarın
  2. Imbalanced Data: SMOTE veya undersampling teknikleri kullanın
  3. Concept Drift: Modelleri düzenli olarak yeniden eğitin
  4. False Positive Yönetimi: İş mantığına uygun eşik değerleri belirleyin

Uygulama Örneği: DDoS Saldırı Tespiti


# Örnek trafik verisi oluşturma
traffic_data = pd.DataFrame(np.random.rand(100, 5), columns=['packet_rate', 'unique_src_ips', 'syn_flag_ratio', 'avg_packet_size', 'port_scan_indicator'])
# Örnek ensemble_detector (daha önce tanımlanmış olmalı)
ensemble_detector = EnsembleAnomalyDetector() # Varsayımsal olarak burada tanımlandığını varsayıyoruz.
ensemble_detector.fit(traffic_data[['packet_rate', 'unique_src_ips', 'syn_flag_ratio', 'avg_packet_size', 'port_scan_indicator']])
alert_threshold = 0.5 # Alarm eşik değeri
def detect_ddos_attack(traffic_data):
    """DDoS saldırılarını tespit et"""
    
    # Özellikler
    features = [
        'packet_rate',
        'unique_src_ips',
        'syn_flag_ratio',
        'avg_packet_size',
        'port_scan_indicator'
    ]
    
    # Model tahminleri
    X = traffic_data[features]
    anomaly_scores = ensemble_detector.predict(X)
    
    # Zaman penceresi analizi
    window_size = 60  # 60 saniye
    rolling_mean = pd.Series(anomaly_scores).rolling(window_size).mean()
    
    # Alarm durumu
    if rolling_mean.iloc[-1] > alert_threshold:
        return True, "Potansiyel DDoS saldırısı tespit edildi!"
    
    return False, "Normal trafik"

Sonuç

Makine öğrenmesi, siber güvenlikte anomali tespiti için güçlü araçlar sunuyor. Ancak başarılı bir sistem için:

  • Doğru veri toplama ve özellik mühendisliği
  • Uygun model seçimi ve hiperparametre ayarlaması
  • Sürekli izleme ve model güncellemeleri
  • İş mantığıyla uyumlu değerlendirme metrikleri

kritik öneme sahip. Unutmayın, hiçbir model %100 doğruluk sağlayamaz, bu yüzden savunma derinliği (defense in depth) yaklaşımı benimsenmelidir.