Siber güvenlik alanında makine öğrenmesi, özellikle anomali tespiti için güçlü bir araç haline geldi. Bu yazıda, ağ trafiğinde ve sistem davranışlarında anormallikleri tespit etmek için kullanılan ML tekniklerini inceleyeceğiz.
Anomali Tespiti Nedir?
Anomali tespiti, normal davranış kalıplarından sapmaları belirleme sürecidir. Siber güvenlikte bu, potansiyel tehditleri erken aşamada tespit etmemizi sağlar.
Veri Hazırlığı
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
# Veri yükleme
def load_network_data(file_path):
"""Ağ trafiği verilerini yükle ve ön işle"""
data = pd.read_csv(file_path)
# Özellik mühendisliği
data['packet_rate'] = data['packets'] / data['duration']
data['byte_rate'] = data['bytes'] / data['duration']
return data
# Örnek veri oluşturma (gerçek uygulamada kendi verilerinizi yükleyeceksiniz)
data = pd.DataFrame(np.random.rand(100, 5), columns=['packets', 'bytes', 'duration', 'src_ip', 'dst_ip'])
data['packet_rate'] = data['packets'] / data['duration']
data['byte_rate'] = data['bytes'] / data['duration']
features = data[['packet_rate', 'byte_rate', 'packets', 'bytes', 'duration']]
# Özellikleri normalize et
scaler = StandardScaler()
features_normalized = scaler.fit_transform(features)
# Eğitim ve test setlerine ayırma (gözetimsiz öğrenme için y_train/y_test genellikle kullanılmaz, ama tutarlılık için eklenmiştir)
X_train, X_test, _, _ = train_test_split(features_normalized, np.zeros(len(features_normalized)), test_size=0.3, random_state=42)
Unsupervised Learning Yaklaşımları
1. Isolation Forest
Isolation Forest, anomalileri izole etmenin normal verilere göre daha kolay olduğu prensibine dayanır:
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.ensemble import IsolationForest
from sklearn.neighbors import LocalOutlierFactor
# Veri yükleme
def load_network_data(file_path):
"""Ağ trafiği verilerini yükle ve ön işle"""
data = pd.read_csv(file_path)
# Özellik mühendisliği
data['packet_rate'] = data['packets'] / data['duration']
data['byte_rate'] = data['bytes'] / data['duration']
return data
# Örnek veri oluşturma (gerçek uygulamada kendi verilerinizi yükleyeceksiniz)
data = pd.DataFrame(np.random.rand(100, 5), columns=['packets', 'bytes', 'duration', 'src_ip', 'dst_ip'])
data['packet_rate'] = data['packets'] / data['duration']
data['byte_rate'] = data['bytes'] / data['duration']
features = data[['packet_rate', 'byte_rate', 'packets', 'bytes', 'duration']]
# Özellikleri normalize et
scaler = StandardScaler()
features_normalized = scaler.fit_transform(features)
# Eğitim ve test setlerine ayırma (gözetimsiz öğrenme için y_train/y_test genellikle kullanılmaz, ama tutarlılık için eklenmiştir)
X_train, X_test, _, _ = train_test_split(features_normalized, np.zeros(len(features_normalized)), test_size=0.3, random_state=42)
# Model oluşturma
iso_forest = IsolationForest(
contamination=0.1, # Beklenen anomali oranı
random_state=42,
n_estimators=100
)
# Modeli eğit
iso_forest.fit(X_train)
# Tahmin yap (-1: anomali, 1: normal)
predictions_iso = iso_forest.predict(X_test)
anomaly_scores_iso = iso_forest.decision_function(X_test)
2. Local Outlier Factor (LOF) ile Anomali Tespiti
LOF, bir veri noktasının komşularına göre ne kadar izole olduğunu ölçer. Yoğunluk tabanlı bir anomali tespit algoritmasıdır:
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.ensemble import IsolationForest
from sklearn.neighbors import LocalOutlierFactor
# Veri yükleme
def load_network_data(file_path):
"""Ağ trafiği verilerini yükle ve ön işle"""
data = pd.read_csv(file_path)
# Özellik mühendisliği
data['packet_rate'] = data['packets'] / data['duration']
data['byte_rate'] = data['bytes'] / data['duration']
return data
# Örnek veri oluşturma (gerçek uygulamada kendi verilerinizi yükleyeceksiniz)
data = pd.DataFrame(np.random.rand(100, 5), columns=['packets', 'bytes', 'duration', 'src_ip', 'dst_ip'])
data['packet_rate'] = data['packets'] / data['duration']
data['byte_rate'] = data['bytes'] / data['duration']
features = data[['packet_rate', 'byte_rate', 'packets', 'bytes', 'duration']]
# Özellikleri normalize et
scaler = StandardScaler()
features_normalized = scaler.fit_transform(features)
# Eğitim ve test setlerine ayırma (gözetimsiz öğrenme için y_train/y_test genellikle kullanılmaz, ama tutarlılık için eklenmiştir)
X_train, X_test, _, _ = train_test_split(features_normalized, np.zeros(len(features_normalized)), test_size=0.3, random_state=42)
# Model oluşturma
lof = LocalOutlierFactor(
n_neighbors=20, # Komşu sayısı
contamination=0.1, # Beklenen anomali oranı
novelty=True # Yeni veri üzerinde tahmin yapmak için True olmalı
)
# Modeli eğit (fit_predict kullanmayın, novelty=True olduğunda fit ve predict ayrı kullanılır)
lof.fit(X_train)
# Tahmin yap (-1: anomali, 1: normal)
predictions_lof = lof.predict(X_test)
anomaly_scores_lof = lof.decision_function(X_test) # LOF için decision_function değeri negatif, daha düşük değerler daha anormaldir.
One-Class SVM
Normal verilerin etrafında bir sınır oluşturur:
from sklearn.svm import OneClassSVM
# One-Class SVM modeli
oc_svm = OneClassSVM(
kernel='rbf',
gamma='scale',
nu=0.1 # Anomali oranı üst sınırı
)
oc_svm.fit(X_train)
predictions_ocsvm = oc_svm.predict(X_test)
Gerçek Zamanlı Anomali Tespiti
Streaming veri için online learning yaklaşımı:
import river
from river import anomaly
from river import preprocessing
class RealTimeAnomalyDetector:
def __init__(self):
self.scaler = preprocessing.StandardScaler()
self.model = anomaly.HalfSpaceTrees(
n_trees=10,
height=8,
window_size=250,
seed=42
)
def update_and_predict(self, x):
# Veriyi normalize et
x_scaled = self.scaler.learn_one(x).transform_one(x)
# Anomali skoru
score = self.model.score_one(x_scaled)
# Modeli güncelle
self.model.learn_one(x_scaled)
return score
# Kullanım
detector = RealTimeAnomalyDetector()
# Gerçek zamanlı veri akışı (örnek bir liste olarak gösterilmiştir)
stream = [{"feature1": 0.5, "feature2": 0.2}, {"feature1": 0.6, "feature2": 0.3}, {"feature1": 0.1, "feature2": 0.9}] # Örnek veri
threshold = 0.7 # Anomali eşik değeri
def alert(message): print(message) # Basit bir alarm fonksiyonu
for data_point in stream:
anomaly_score = detector.update_and_predict(data_point)
if anomaly_score > threshold:
alert("Anomali tespit edildi!")
Özellik Seçimi ve Boyut İndirgeme
from sklearn.decomposition import PCA
# PCA ile boyut indirgeme
pca = PCA(n_components=0.95) # %95 varyansı koru
X_reduced = pca.fit_transform(X_train)
print(f"Orijinal boyut: {X_train.shape[1]}")
print(f"İndirgenmiş boyut: {X_reduced.shape[1]}")
Model Değerlendirme
from sklearn.metrics import classification_report, roc_auc_score
import matplotlib.pyplot as plt
def evaluate_anomaly_detector(y_true, y_pred, scores):
"""Anomali tespit modelini değerlendir"""
# Sınıflandırma raporu
print(classification_report(y_true, y_pred))
# ROC AUC
roc_auc = roc_auc_score(y_true, scores)
print(f"ROC AUC Score: {roc_auc:.4f}")
# Skor dağılımı
plt.figure(figsize=(10, 6))
plt.hist(scores[y_true == 0], bins=50, alpha=0.7, label='Normal')
plt.hist(scores[y_true == 1], bins=50, alpha=0.7, label='Anomali')
plt.xlabel('Anomali Skoru')
plt.ylabel('Frekans')
plt.legend()
plt.title('Anomali Skorlarının Dağılımı')
plt.show()
Hibrit Yaklaşım: Ensemble Modeller
class EnsembleAnomalyDetector:
def __init__(self):
self.models = {
'isolation_forest': IsolationForest(contamination=0.1),
'one_class_svm': OneClassSVM(nu=0.1),
'lof': LocalOutlierFactor(novelty=True, contamination=0.1)
}
self.weights = {'isolation_forest': 0.4, 'one_class_svm': 0.3, 'lof': 0.3}
def fit(self, X):
for name, model in self.models.items():
model.fit(X)
def predict(self, X):
scores = {}
for name, model in self.models.items():
if hasattr(model, 'decision_function'):
scores[name] = model.decision_function(X)
else:
scores[name] = model.score_samples(X)
# Not: Farklı modellerin skorları farklı ölçeklerde olabilir.
# Gerçek bir uygulamada, bu skorları birleştirmeden önce normalleştirmek faydalı olacaktır.
# Örneğin, Min-Max Scaler veya StandardScaler kullanılabilir.
# Ağırlıklı ortalama
final_scores = np.zeros(len(X))
for name, score in scores.items():
final_scores += self.weights[name] * score
# Eşik değere göre sınıflandırma
threshold = np.percentile(final_scores, 10)
return (final_scores < threshold).astype(int)
Güvenlik İpuçları
- Feature Engineering: Domain bilgisini kullanarak anlamlı özellikler çıkarın
- Imbalanced Data: SMOTE veya undersampling teknikleri kullanın
- Concept Drift: Modelleri düzenli olarak yeniden eğitin
- False Positive Yönetimi: İş mantığına uygun eşik değerleri belirleyin
Uygulama Örneği: DDoS Saldırı Tespiti
# Örnek trafik verisi oluşturma
traffic_data = pd.DataFrame(np.random.rand(100, 5), columns=['packet_rate', 'unique_src_ips', 'syn_flag_ratio', 'avg_packet_size', 'port_scan_indicator'])
# Örnek ensemble_detector (daha önce tanımlanmış olmalı)
ensemble_detector = EnsembleAnomalyDetector() # Varsayımsal olarak burada tanımlandığını varsayıyoruz.
ensemble_detector.fit(traffic_data[['packet_rate', 'unique_src_ips', 'syn_flag_ratio', 'avg_packet_size', 'port_scan_indicator']])
alert_threshold = 0.5 # Alarm eşik değeri
def detect_ddos_attack(traffic_data):
"""DDoS saldırılarını tespit et"""
# Özellikler
features = [
'packet_rate',
'unique_src_ips',
'syn_flag_ratio',
'avg_packet_size',
'port_scan_indicator'
]
# Model tahminleri
X = traffic_data[features]
anomaly_scores = ensemble_detector.predict(X)
# Zaman penceresi analizi
window_size = 60 # 60 saniye
rolling_mean = pd.Series(anomaly_scores).rolling(window_size).mean()
# Alarm durumu
if rolling_mean.iloc[-1] > alert_threshold:
return True, "Potansiyel DDoS saldırısı tespit edildi!"
return False, "Normal trafik"
Sonuç
Makine öğrenmesi, siber güvenlikte anomali tespiti için güçlü araçlar sunuyor. Ancak başarılı bir sistem için:
- Doğru veri toplama ve özellik mühendisliği
- Uygun model seçimi ve hiperparametre ayarlaması
- Sürekli izleme ve model güncellemeleri
- İş mantığıyla uyumlu değerlendirme metrikleri
kritik öneme sahip. Unutmayın, hiçbir model %100 doğruluk sağlayamaz, bu yüzden savunma derinliği (defense in depth) yaklaşımı benimsenmelidir.