همگامسازی دادهها بین دو سیستم پایگاهداده ناهمگن مانند PostgreSQL و MySQL یکی از چالشبرانگیزترین و در عین حال رایجترین نیازهای سازمانهای امروزی است. با گسترش معماریهای چندپایگاهدادهای و نیاز به استفاده از نقاط قوت هر سیستم، مسئله همگامسازی به موضوعی حیاتی تبدیل شده است. این مقاله به صورت جامع و با جزئیات فنی به بررسی راهکارهای مختلف برای حل این چالش میپردازد.
مقدمه: چرا همگامسازی پایگاهدادههای ناهمگن ضروری است؟
در دنیای امروز، سازمانها به دلایل مختلفی از پایگاهدادههای متفاوت استفاده میکنند. ممکن است یک شرکت به دلیل Legacy Systems از MySQL استفاده کند اما برای قابلیتهای پیشرفته PostgreSQL تمایل به مهاجرت داشته باشد. یا ممکن است بخشهای مختلف یک سازمان به دلایل فنی، عملکردی یا تاریخی از پایگاهدادههای متفاوت استفاده کنند. در چنین سناریوهایی، همگامسازی دادهها بین سیستمهای ناهمگن تبدیل به یک ضرورت میشود.
همگامسازی موفق نه تنها باعث یکپارچگی دادهها میشود، بلکه امکان استفاده از قابلیتهای منحصر به فرد هر سیستم پایگاهداده را فراهم میکند. برای مثال، ممکن است از MySQL برای تراکنشهای سریع و از PostgreSQL برای تحلیلهای پیچیده استفاده شود.
مرحله ۱: برنامهریزی و طراحی – پایهای ترین و مهمترین مرحله
تعیین منبع و مقصد (Source & Target)
اولین و مهمترین گام در فرآیند همگامسازی، تعیین منبع حقیقی (Source of Truth) است. این تصمیم تأثیر مستقیمی بر طراحی کل سیستم همگامسازی دارد:
همگامسازی یکطرفه: در این مدل، یک پایگاهداده به عنوان منبع اصلی تعیین میشود و پایگاهداده دیگر به عنوان replica عمل میکند. این مدل سادهتر است و برای سناریوهایی مناسب است که:
-
مهاجرت از یک پایگاهداده به دیگری در حال انجام است
-
یک سیستم Legacy به تدریج در حال جایگزینی است
-
نیاز به خواندن دادهها از چندین منبع وجود دارد اما نوشتن فقط در یک منبع انجام میشود
همگامسازی دوطرفه: این مدل پیچیدهتر است و نیاز به مکانیزمهای پیشرفته برای حل تضادها (Conflict Resolution) دارد. موارد استفاده:
-
سیستمهای توزیع شده با نوشتن در چندین نقطه
-
برنامههای کاربری که در مناطق جغرافیایی مختلف مستقر شدهاند
-
سناریوهای High Availability و Disaster Recovery
تعیین استراتژی همگامسازی
Full Load (بارگذاری کامل):
در این روش، در هر بار همگامسازی، تمام دادههای از source استخراج و در target جایگزین میشوند.
مزایا:
-
ساده در پیادهسازی
-
عدم نیاز به مکانیزم پیچیده ردیابی تغییرات
-
اطمینان از یکسان بودن کامل دادهها پس از هر sync
معایب:
-
مصرف پهنای باند بالا
-
زمان بر بودن به خصوص برای جداول بزرگ
-
فشار زیاد بر روی سیستم در زمان sync
Incremental Load (بارگذاری افزایشی) یا CDC – Change Data Capture:
در این روش، فقط دادههای تغییر کرده از آخرین همگامسازی منتقل میشوند.
مزایا:
-
کارایی بالا
-
مصرف کم پهنای باند
-
امکان همگامسازی تقریباً بلادرنگ
معایب:
-
پیچیدگی در پیادهسازی
-
نیاز به مکانیزم reliable برای ردیابی تغییرات
-
چالش در مدیریت حذفها (Deletes)
شناسایی و مدیریت موانع فنی (Technical Differences)
تفاوتهای بین PostgreSQL و MySQL چالشهای متعددی ایجاد میکند که باید به دقت مدیریت شوند:
نوع دادهها (Data Types):
-
نوع boolean: در MySQL به صورت TINYINT(1) با مقادیر 0 و 1 ذخیره میشود، در حالی که در PostgreSQL به صورت BOOL با مقادیر true/false
-
نوع datetime: MySQL از انواع DATETIME و TIMESTAMP استفاده میکند که هر کدام ویژگیهای خاص خود را دارند. PostgreSQL از TIMESTAMP با timezone و بدون timezone پشتیبانی میکند
-
نوع عددی: MySQL دارای انواع INT, BIGINT, FLOAT, DOUBLE است. PostgreSQL علاوه بر اینها انواع NUMERIC و DECIMAL با دقت دقیق را ارائه میدهد
-
نوع متن: MySQL دارای CHAR, VARCHAR, TEXT با محدودیتهای اندازه مختلف. PostgreSQL دارای انواع متن با قابلیتهای پیشرفتهتر است
Collation و Character Encoding:
-
اطمینان از استفاده از encoding یکسان (معمولاً UTF-8) در هر دو پایگاهداده ضروری است
-
تنظیمات Collation در MySQL و PostgreSQL متفاوت است و میتواند بر sorting و comparisons تأثیر بگذارد
حساسیت به حروف (Case Sensitivity):
-
نام جدولها و ستونها در PostgreSQL به طور پیشفرض به حروف کوچک و بزرگ حساس نیستند اما به صورت case-sensitive ذخیره میشوند
-
در MySQL، حساسیت به حروف بستگی به سیستم فایل زیرین و تنظیمات دارد
مکانیزم خودافزایی (Auto-Increment):
-
PostgreSQL از SEQUENCEها استفاده میکند که با SERIAL یا IDENTITY تعریف میشوند
-
MySQL از AUTO_INCREMENT استفاده میکند که رفتار متفاوتی دارد
تفاوتهای transaction و locking:
-
سطح ایزوله تراکنشها در دو سیستم ممکن است متفاوت باشد
-
مکانیزمهای قفلگذاری (locking) تفاوتهای قابل توجهی دارند
مرحله ۲: راهکارهای فنی عملی
راهکار ۱: استفاده از ابزارهای ETL/ELT
ابزارهای ETL (Extract, Transform, Load) برای همگامسازی دادهها بین سیستمهای ناهمگن بسیار مناسب هستند.
Apache Airflow:
Airflow یک پلتفرم منبع باز برای برنامهریزی و مانیتورینگ گردش کار است. برای همگامسازی میتوانید یک DAG (Directed Acyclic Graph) طراحی کنید:
from airflow import DAG from airflow.operators.python_operator import PythonOperator from datetime import datetime, timedelta from sync_module import mysql_to_postgres_sync default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2023, 1, 1), 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), } dag = DAG( 'mysql_postgres_sync', default_args=default_args, description='Sync data from MySQL to PostgreSQL', schedule_interval=timedelta(hours=1), ) sync_task = PythonOperator( task_id='mysql_to_postgres_sync', python_callable=mysql_to_postgres_sync, dag=dag, )
Talend Open Studio:
Talend یک ابزار ETL گرافیکی است که connectors از پیش ساختهشده برای انواع پایگاهداده دارد. مزایای Talend:
-
رابط کاربری بصری برای طراحی transformationها
-
پشتیبانی از انواع گستردهای از پایگاهدادهها
-
قابلیت تولید کد و استقرار در محیطهای مختلف
-
مدیریت خطا و logging پیشرفته
Pentaho Data Integration (Kettle):
ابزار دیگری در دسته ETL که ویژگیهای مشابه Talend ارائه میدهد.
راهکار ۲: استفاده از سرویسهای ابری
AWS Database Migration Service (DMS):
سرویس مدیریت شده AWS برای迁移 و همگامسازی پایگاهدادهها
مزایا:
-
راهاندازی سریع و آسان
-
پشتیبانی از همگامسازی مداوم (CDC)
-
مدیریت خودکار infrastructure
-
monitoring و alerting یکپارچه
مراحل راهاندازی:
-
ایجاد replication instance
-
تعریف source و target endpoints
-
ایجاد replication task با تعیین mapping rules
-
راهاندازی و مانیتورینگ task
Google Cloud Dataflow:
سرویس پردازش دادههای جریان و batch در GCP که میتواند برای همگامسازی استفاده شود.
Azure Data Factory:
سرویس integration داده در Azure که قابلیتهای ETL و ELT ارائه میدهد.
راهکار ۳: Logical Replication (پیشرفته و کارآمد)
این روش برای همگامسازی بلادرنگ با تأخیر بسیار کم مناسب است.
برای PostgreSQL:
PostgreSQL از logical replication با استفاده از publication و subscription پشتیبانی میکند. همچنین میتوان از output pluginهای مانند wal2json برای استخراج تغییرات استفاده کرد.
-- ایجاد publication در PostgreSQL CREATE PUBLICATION my_publication FOR TABLE users, orders; -- ایجاد subscription در PostgreSQL دیگر (برای همگامسازی بین دو PostgreSQL) -- برای MySQL نیاز به یک مکانیزم custom داریم
برای MySQL:
MySQL از binary log (binlog) برای ضبط تغییرات استفاده میکند.
پیادهسازی custom application:
یک برنامه میانی میتواند تغییرات را از source خوانده و به target اعمال کند.
import mysql.connector from psycopg2 import connect import json from datetime import datetime class MySQLToPGReplicator: def __init__(self, mysql_config, pg_config): self.mysql_config = mysql_config self.pg_config = pg_config self.last_position = None def connect_to_mysql(self): return mysql.connector.connect(**self.mysql_config) def connect_to_postgres(self): return connect(**self.pg_config) def read_binlog_events(self): # پیادهسازی خواندن events از MySQL binlog pass def apply_to_postgres(self, events): # پیادهسازی اعمال تغییرات در PostgreSQL pass def start_replication(self): while True: events = self.read_binlog_events() if events: self.apply_to_postgres(events) self.save_position() time.sleep(0.1) # تنظیم interval بر اساس نیاز
راهکار ۴: اسکریپت نویسی با Python (انعطافپذیر و پرکاربرد)
اسکریپتنویسی با Python انعطافپذیری کامل را ارائه میدهد و برای همگامسازی دورهای مناسب است.
نمونه کاملتر برای همگامسازی افزایشی:
import pandas as pd from sqlalchemy import create_engine, text from datetime import datetime, timedelta import logging import sys # تنظیمات logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('sync_log.log'), logging.StreamHandler(sys.stdout) ] ) class DatabaseSynchronizer: def __init__(self, mysql_conn_str, postgres_conn_str): self.mysql_engine = create_engine(mysql_conn_str) self.postgres_engine = create_engine(postgres_conn_str) self.sync_state = self.load_sync_state() def load_sync_state(self): """بارگذاری آخرین وضعیت همگامسازی""" try: with self.postgres_engine.connect() as conn: result = conn.execute(text("SELECT last_sync_time FROM sync_metadata WHERE table_name = 'target_table'")) return result.scalar() or datetime(1970, 1, 1) except: return datetime(1970, 1, 1) def save_sync_state(self, new_sync_time): """ذخیره زمان آخرین همگامسازی""" with self.postgres_engine.connect() as conn: conn.execute(text(""" INSERT INTO sync_metadata (table_name, last_sync_time) VALUES ('target_table', :sync_time) ON CONFLICT (table_name) DO UPDATE SET last_sync_time = EXCLUDED.last_sync_time """), {'sync_time': new_sync_time}) conn.commit() def get_changed_data(self, table_name, last_sync): """دریافت دادههای تغییر کرده از MySQL""" query = text(f""" SELECT * FROM {table_name} WHERE last_modified > :last_sync OR created > :last_sync """) with self.mysql_engine.connect() as conn: df = pd.read_sql_query(query, conn, params={'last_sync': last_sync}) return df def apply_changes_to_postgres(self, df, table_name): """اعمال تغییرات در PostgreSQL""" if df.empty: logging.info("No changes to sync") return # تبدیل نوع دادهها اگر لازم باشد type_mappings = { 'tinyint': 'boolean', 'datetime': 'timestamp' } # ذخیره دادهها در PostgreSQL with self.postgres_engine.begin() as conn: # استفاده از روش UPSERT برای به روزرسانی یا درج رکوردها for _, row in df.iterrows(): # ساختار dynamic query بر اساس ستونهای موجود columns = ', '.join(row.index) placeholders = ', '.join([f':{col}' for col in row.index]) update_clause = ', '.join([f"{col} = EXCLUDED.{col}" for col in row.index if col != 'id']) upsert_query = text(f""" INSERT INTO {table_name} ({columns}) VALUES ({placeholders}) ON CONFLICT (id) DO UPDATE SET {update_clause} """) conn.execute(upsert_query, row.to_dict()) logging.info(f"Applied {len(df)} changes to {table_name}") def sync_table(self, table_name): """همگامسازی یک جدول""" try: # دریافت دادههای تغییر کرده changed_data = self.get_changed_data(table_name, self.sync_state) # اعمال تغییرات self.apply_changes_to_postgres(changed_data, table_name) # به روزرسانی وضعیت همگامسازی new_sync_time = datetime.now() self.save_sync_state(new_sync_time) self.sync_state = new_sync_time logging.info(f"Sync completed for {table_name} at {new_sync_time}") except Exception as e: logging.error(f"Error syncing {table_name}: {str(e)}") raise def sync_all_tables(self): """همگامسازی تمام جداول تعریف شده""" tables = ['users', 'orders', 'products'] # لیست جداول برای همگامسازی for table in tables: self.sync_table(table) def close_connections(self): """بستن اتصالات""" self.mysql_engine.dispose() self.postgres_engine.dispose() # استفاده از کلاس if __name__ == "__main__": mysql_conn_str = 'mysql+mysqlconnector://user:password@mysql_host/mysql_db' postgres_conn_str = 'postgresql+psycopg2://user:password@postgresql_host/postgresql_db' synchronizer = DatabaseSynchronizer(mysql_conn_str, postgres_conn_str) try: synchronizer.sync_all_tables() finally: synchronizer.close_connections()
مدیریت حذفها (Deletes) در همگامسازی افزایشی:
مدیریت حذفها یکی از چالشبرانگیزترین بخشهای همگامسازی افزایشی است. چند راهکار:
-
Soft Delete: استفاده از ستون is_deleted به جای حذف فیزیکی
-
Deletion Log Table: ثبت حذفها در یک جدول جداگانه
-
Periodic Full Sync: انجام همگامسازی کامل دورهای برای پاکسازی رکوردهای حذف شده
def track_deletes(self): """ردیابی رکوردهای حذف شده""" # در MySQL، با استفاده از triggers حذفها را در جدول جداگانه ثبت کنید pass def apply_deletes(self): """اعمال حذفها در مقصد""" with self.mysql_engine.connect() as conn: deletes = pd.read_sql_query("SELECT id FROM deletion_log WHERE applied = 0", conn) if not deletes.empty: with self.postgres_engine.begin() as conn: for _, row in deletes.iterrows(): conn.execute(text("DELETE FROM target_table WHERE id = :id"), {'id': row['id']}) # علامت گذاری حذفها به عنوان applied conn.execute(text("UPDATE deletion_log SET applied = 1 WHERE id IN :ids"), {'ids': tuple(deletes['id'].tolist())})
مرحله ۳: یکسانسازی طرحواره (Schema Harmonization)
ایجاد Mapping Document جامع
یک mapping document دقیق برای تبدیل انواع داده و ساختارها ضروری است:
ستون در MySQL | نوع داده در MySQL | نوع داده معادل در PostgreSQL | توضیحات تبدیل |
---|---|---|---|
is_active | TINYINT(1) | BOOLEAN | 1 -> true, 0 -> false |
created_at | DATETIME | TIMESTAMP | تبدیل مستقیم |
price | DECIMAL(10,2) | NUMERIC(10,2) | تبدیل مستقیم |
json_data | JSON | JSONB | تبدیل مستقیم |
text_content | LONGTEXT | TEXT | تبدیل مستقیم |
استفاده از ابزارهای تبدیل طرحواره
pgLoader:
ابزار قدرتمند برای انتقال داده از MySQL به PostgreSQL با قابلیت تبدیل انواع داده
# نصب pgLoader sudo apt-get install pgloader # اجرای انتقال pgloader mysql://user:password@mysql_host/db_name postgresql://user:password@postgresql_host/db_name
اسکریپت تبدیل طرحواره با Python:
def generate_schema_mapping(mysql_schema, postgres_schema): """تولید mapping بین طرحوارهها""" type_mapping = { 'tinyint(1)': 'boolean', 'datetime': 'timestamp', 'longtext': 'text', 'mediumtext': 'text', 'varchar(255)': 'varchar(255)', 'int(11)': 'integer', 'bigint(20)': 'bigint', 'double': 'double precision', 'decimal(10,2)': 'numeric(10,2)' } schema_diff = {} for table in mysql_schema.tables: if table not in postgres_schema.tables: schema_diff[table] = {'action': 'create', 'columns': {}} continue for column in mysql_schema.tables[table].columns: pg_column = postgres_schema.tables[table].columns.get(column.name) if not pg_column: schema_diff.setdefault(table], {'action': 'alter', 'columns': {}}) schema_diff[table]['columns'][column.name] = { 'action': 'add', 'type': type_mapping.get(column.type, column.type) } elif pg_column.type != type_mapping.get(column.type, column.type): schema_diff.setdefault(table], {'action': 'alter', 'columns': {}}) schema_diff[table]['columns'][column.name] = { 'action': 'modify', 'old_type': pg_column.type, 'new_type': type_mapping.get(column.type, column.type) } return schema_diff
مرحله ۴: مانیتورینگ، مدیریت خطا و اطمینان از قابلیت اطمینان
پیادهسازی مانیتورینگ جامع
مانیتورینگ فرآیند همگامسازی برای اطمینان از صحت و تداوم آن ضروری است.
معیارهای کلیدی برای مانیتورینگ:
-
تاخیر همگامسازی (Sync Lag)
-
تعداد رکوردهای پردازش شده در واحد زمان
-
نرخ خطا (Error Rate)
-
مدت زمان همگامسازی
-
استفاده از منابع (CPU, Memory, Network)
پیادهسازی با Prometheus و Grafana:
from prometheus_client import Counter, Gauge, start_http_server import time # تعریف metrics SYNC_RECORDS_COUNTER = Counter('sync_records_total', 'Total records synced', ['table']) SYNC_DURATION_GAUGE = Gauge('sync_duration_seconds', 'Sync duration in seconds') SYNC_LAG_GAUGE = Gauge('sync_lag_seconds', 'Sync lag in seconds') SYNC_ERROR_COUNTER = Counter('sync_errors_total', 'Total sync errors') class MonitoredSynchronizer(DatabaseSynchronizer): def sync_table(self, table_name): start_time = time.time() try: super().sync_table(table_name) duration = time.time() - start_time SYNC_DURATION_GAUGE.set(duration) SYNC_RECORDS_COUNTER.labels(table=table_name).inc(self.record_count) except Exception as e: SYNC_ERROR_COUNTER.inc() logging.error(f"Sync failed for {table_name}: {str(e)}") raise
مدیریت خطا و بازیابی (Error Handling and Recovery)
الگوی Retry با Exponential Backoff:
import tenacity from tenacity import retry, stop_after_attempt, wait_exponential @retry( stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, min=4, max=10), retry=retry_if_exception_type((NetworkError, DatabaseError)) ) def sync_with_retry(self, table_name): """همگامسازی با قابلیت retry""" return self.sync_table(table_name)
مکانیزم Dead Letter Queue برای مدیریت خطاهای مداوم:
def handle_sync_error(self, table_name, record_id, error): """مدیریت خطاهای همگامسازی""" error_record = { 'table_name': table_name, 'record_id': record_id, 'error_message': str(error), 'timestamp': datetime.now(), 'retry_count': 0 } # ذخیره در جدول error queue with self.postgres_engine.begin() as conn: conn.execute( text("INSERT INTO sync_error_queue (table_name, record_id, error_message, timestamp, retry_count) VALUES (:table_name, :record_id, :error_message, :timestamp, :retry_count)"), error_record )
جمعبندی و انتخاب راهکار مناسب
مقایسه جامع روشهای مختلف
روش | بهترین Use Case | مزایا | معایب | هزینه | پیچیدگی |
---|---|---|---|---|---|
ابزارهای ETL/ELT (Airflow, Talend) | همگامسازی مداوم و قابل اطمینان | خودکار، قابل monitoring، مدیریت خطا | پیچیدگی راهاندازی اولیه | متوسط تا بالا | بالا |
اسکریپت نویسی (Python) | همگامسازی دورهای، پروژههای خاص | انعطافپذیری کامل، هزینه پایین شروع | نیاز به توسعه و نگهداری داخلی | پایین | متوسط |
سرویسهای ابری (AWS DMS) | همگامسازی در زیرساخت ابری | مدیریت کامل توسط cloud provider، یکپارچه با سایر سرویسها | هزینه بر اساس usage، وابستگی به cloud vendor | متغیر (پرداخت بر اساس usage) | پایین |
Logical Replication | همگامسازی بلادرنگ با تاخیر بسیار کم | کارایی بسیار بالا، تاخیر کم | پیچیدگی implementation بسیار بالا | بالا | بسیار بالا |
توصیههای نهایی بر اساس سناریوهای مختلف
-
مهاجرت تدریجی از MySQL به PostgreSQL:
-
شروع با اسکریپتهای Python برای همگامسازی دورهای
-
استفاده از pgLoader برای انتقال اولیه دادهها
-
پیادهسازی همگامسازی افزایشی با مکانیزم CDC
-
-
معماری چندپایگاهدادهای با خواندن/نوشتن از هر دو سیستم:
-
استفاده از همگامسازی دوطرفه
-
پیادهسازی مکانیزم حل تضاد (Conflict Resolution)
-
استفاده از ابزارهای ETL پیشرفته مانند Talend
-
-
همگامسازی بلادرنگ برای برنامههای حیاتی:
-
استفاده از logical replication
-
پیادهسازی سرویس میانی برای مدیریت replication
-
مانیتورینگ دقیق و سیستم هشدار سریع
-
-
همگامسازی دورهای برای گزارشگیری و analytics:
-
استفاده از اسکریپتهای Python ساده
-
همگامسازی شبانه یا در ساعات کمبار
-
تمرکز بر روی تبدیل دادهها برای نیازهای analytics
-
چکلیست نهایی برای پیادهسازی موفق
-
تعیین منبع حقیقی و جهت همگامسازی
-
تهیه mapping document برای انواع داده و ساختارها
-
انتخاب استراتژی همگامسازی (کامل/افزایشی)
-
طراحی مکانیزم مدیریت خطا و بازیابی
-
پیادهسازی مانیتورینگ و alerting
-
تست کامل با دادههای واقعی
-
مستندسازی فرآیند و آموزش تیم
-
برنامهریزی برای نگهداری و بهروزرسانی
همگامسازی بین PostgreSQL و MySQL اگرچه چالشبرانگیز است، اما با برنامهریزی دقیق و انتخاب ابزارهای مناسب کاملاً قابل انجام است. کلید موفقیت در درک تفاوتهای بین دو سیستم و طراحی یک راهحل انعطافپذیر و قابل اطمینان است.