从零开始实现分布式服务系统

发布时间:2023年12月18日

开发前言

分布式系统具有高可靠性、高性能、可扩展性、灵活性、数据共享、可靠性和地理分布等优点,使得其在各种应用场景下都具有巨大的优势,当然分布式系统实现复杂度要高于单体系统🫠

项目代码使用纯粹的Go语言标准库实现,不借用任何其它第三方库😁

我是醉墨居士,废话不多说,我们现在开始吧🤗

分布式模型

  • Hub & Spoke模型
  • 优点:集中管理,安全性,降低成本
  • 缺点:单点故障,延迟,有限的扩展性

在这里插入图片描述

  • Peer to Peer模型
  • 优点:去中心化,高度可扩展性,资源共享
  • 缺点:管理复杂性,安全性,性能问题

在这里插入图片描述

  • Message Queues模型
  • 优点:解耦合,异步处理,可靠性
  • 缺点:系统复杂度,准确性,消息顺序

在这里插入图片描述

我们将要开发的分布式系统将会取其精华,去其糟粕,使用采用上述模型的混合模式😎

系统图解

在这里插入图片描述

注册中心模块

  • 注册信息
package registry

import (
	"encoding/json"
	"fmt"
	"io"
	"strings"
)

type Registration struct {
	ServiceName string
	ServiceAddr string
	RequiredServices []string
}

func buildRegistration(reader io.ReadCloser) (*Registration, error) {
	defer reader.Close()

	data, err := io.ReadAll(reader)
	if err != nil {
		return nil, err
	}

	registration := new(Registration)
	err = json.Unmarshal(data, registration)
	if err != nil {
		return nil, err
	}

	return registration, nil
}

func buildServiceInfo(reader io.ReadCloser) ([]string, error) {
	defer reader.Close()

	data, err := io.ReadAll(reader)
	if err != nil {
		return nil, err
	}

	parts := strings.SplitN(string(data), " ", 2)
	if len(parts) != 2 {
		return nil, fmt.Errorf("Parse service failed with length %d", len(parts))
	}

	return parts, nil
}
  • 注册信息表
package registry

import (
	"bytes"
	"encoding/json"
	"fmt"
	"io"
	"log"
	"math/rand"
	"net/http"
	"sync"
)

type serviceTable struct {
	serviceInfos map[string][]*Registration
	lock *sync.RWMutex
}

func newServiceTable() *serviceTable {
	return &serviceTable{
		serviceInfos: make(map[string][]*Registration),
		lock: new(sync.RWMutex),
	}
}

func (t *serviceTable) parseServiceInfos(reader io.ReadCloser) (err error){
	data, err := io.ReadAll(reader)
	if err != nil {
		return err
	}
	defer func() {
		err = reader.Close()
	}()
	t.lock.Lock()
	defer t.lock.Unlock()
	err = json.Unmarshal(data, &t.serviceInfos)
	return
}

func (t *serviceTable) buildRequiredServiceInfos(registration *Registration) map[string][]*Registration {
	m := make(map[string][]*Registration, len(registration.RequiredServices))
	t.lock.RLock()
	defer t.lock.RUnlock()
	
	for _, serviceName := range registration.RequiredServices {
		m[serviceName] = t.serviceInfos[serviceName]
	}

	return m
}

func (t *serviceTable) notify(method string, registration *Registration) error {
	if method != http.MethodPost && method != http.MethodDelete {
		fmt.Println(method, method == http.MethodPost, method == http.MethodDelete)
		return fmt.Errorf("Method not allowed with method: %s", method)
	}

	t.lock.RLock()
	defer t.lock.RUnlock()

	data, err := json.Marshal(registration)
	if err != nil {
		return err
	}

	for _, registrations := range t.serviceInfos {
		for _, reg := range registrations {
			for _, requiredServiceName := range reg.RequiredServices {
				if requiredServiceName == registration.ServiceName {
					req, err := http.NewRequest(method, "http://" + reg.ServiceAddr + "/services", bytes.NewReader(data))
					if err != nil {
						continue
					}
					log.Println("update url: ", reg.ServiceAddr + "/services")
					http.DefaultClient.Do(req)
				}
			}
		}
	}

	return nil
}

func (t *serviceTable) add(registration *Registration) {
	t.lock.Lock()
	defer t.lock.Unlock()

	log.Printf("Service table add %s with address %s\n", registration.ServiceName, registration.ServiceAddr)
	if registrations, ok := t.serviceInfos[registration.ServiceName]; ok {
		registrations = append(registrations, registration)
	} else {
		t.serviceInfos[registration.ServiceName] = []*Registration{registration}
	}
}

func (t *serviceTable) remove(registration *Registration) {
	t.lock.Lock()
	defer t.lock.Unlock()

	log.Printf("Service table remove %s with address %s\n", registration.ServiceName, registration.ServiceAddr)
	if registrations, ok := t.serviceInfos[registration.ServiceName]; ok {
		for i := len(registrations) - 1; i >= 0; i-- {
			if registrations[i].ServiceAddr == registration.ServiceAddr {
				registrations = append(registrations[:i], registrations[i+1:]...)
			}
		}
	}
}

func (t *serviceTable) get(serviceName string) *Registration {
	t.lock.RLock()
	defer t.lock.RUnlock()
	regs := t.serviceInfos[serviceName]
	return regs[rand.Intn(len(regs))]
}
  • 注册服务
package registry

import (
	"encoding/json"
	"net/http"
	"time"
)

const (
	serviceName = "Registry Service"
	serviceAddr = "127.0.0.1:20000"
)

type RegistryService struct {
	serviceInfos *serviceTable
	heartBeatWorkerNumber int
	heartBeatAttempCount int
	heartBeatAttempDuration time.Duration
	heartBeatCheckDuration time.Duration
}

func Default() *RegistryService {
	return New(3, 3, time.Second, 30 * time.Second)
}

func New(heartBeatWorkerNumber, heartBeatAttempCount int, heartBeatAttempDuration, heartBeatCheckDuration time.Duration) *RegistryService {
	return &RegistryService{
		serviceInfos: newServiceTable(),
		heartBeatWorkerNumber: heartBeatWorkerNumber,
		heartBeatAttempCount: heartBeatAttempCount,
		heartBeatAttempDuration: heartBeatAttempDuration,
		heartBeatCheckDuration: heartBeatCheckDuration,
	}
}

func (s *RegistryService) Run() error {
	go s.heartBeat()

	http.HandleFunc("/services", func(w http.ResponseWriter, r *http.Request) {
		statusCode := http.StatusOK
		switch r.Method {
		case http.MethodPost:
			registration, err := buildRegistration(r.Body)
			if err != nil {
				statusCode = http.StatusInternalServerError
				goto END
			}

			err = s.regist(registration)
			if err != nil {
				statusCode = http.StatusInternalServerError
				goto END
			}

			serviceInfos := s.serviceInfos.buildRequiredServiceInfos(registration)
			data, err := json.Marshal(&serviceInfos)
			if err != nil {
				statusCode = http.StatusInternalServerError
				goto END
			}
			defer w.Write(data)

		case http.MethodDelete:
			registration, err := buildRegistration(r.Body)
			if err != nil {
				statusCode = http.StatusInternalServerError
				goto END
			}

			s.unregist(registration)
			if err != nil {
				statusCode = http.StatusInternalServerError
				goto END
			}

		default:
			statusCode = http.StatusMethodNotAllowed
			goto END
		}

	END:
		w.WriteHeader(statusCode)
	})

	return http.ListenAndServe(serviceAddr, nil)
}

func (s *RegistryService) heartBeat() {
	channel := make(chan *Registration, 1)
	for i := 0; i < s.heartBeatWorkerNumber; i++ {
		go func() {
			for reg := range channel {
				for j := 0; j < s.heartBeatAttempCount; j++ {
					resp, err := http.Get("http://" + reg.ServiceAddr + "/heart-beat")
					if err == nil && resp.StatusCode == http.StatusOK {
						goto NEXT
					}
					time.Sleep(s.heartBeatAttempDuration)
				}

				s.unregist(reg)

				NEXT:
			}
		}()
	}

	for {
		s.serviceInfos.lock.RLock()
		for _, registrations := range s.serviceInfos.serviceInfos {
			for i := len(registrations) - 1; i >= 0; i-- {
				channel <- registrations[i]
			}
		}
		s.serviceInfos.lock.RUnlock()
		time.Sleep(s.heartBeatCheckDuration)
	}
}

func (s *RegistryService) regist(registration *Registration) error {
	s.serviceInfos.add(registration)
	return s.serviceInfos.notify(http.MethodPost, registration)
}

func (s *RegistryService) unregist(registration *Registration) error {
	s.serviceInfos.remove(registration)
	return s.serviceInfos.notify(http.MethodDelete, registration)
}
  • 注册服务客户端接口
package registry

import (
	"bytes"
	"encoding/json"
	"fmt"
	"net/http"
)

func registerMonitorHandler() {
	http.HandleFunc("/services", func(w http.ResponseWriter, r *http.Request) {
		defer r.Body.Close()

		switch r.Method {
		case http.MethodPost:
			registration, err := buildRegistration(r.Body)
			if err != nil {
				w.WriteHeader(http.StatusInternalServerError)
				return
			}
			provider.add(registration)
			fmt.Printf("add service %s\n", registration.ServiceName)

		case http.MethodDelete:
			registration, err := buildRegistration(r.Body)
			if err != nil {
				w.WriteHeader(http.StatusInternalServerError)
				return
			}
			provider.remove(registration)
			fmt.Printf("remove service %s\n", registration.ServiceName)

		default:
			w.WriteHeader(http.StatusMethodNotAllowed)
			return
		}
		w.WriteHeader(http.StatusOK)
	})

	http.HandleFunc("/heart-beat", func(w http.ResponseWriter, r *http.Request) {
		w.WriteHeader(http.StatusOK)
	})
}

func RegistService(registration *Registration) error {
	registerMonitorHandler()

	data, err := json.Marshal(registration)
	if err != nil {
		return err
	}

	resp, err := http.Post("http://" + serviceAddr + "/services", "application/json", bytes.NewReader(data))
	if err != nil {
		return err
	}

	if resp.StatusCode != http.StatusOK {
		return fmt.Errorf("Regist %s error with code %d", registration.ServiceName, resp.StatusCode)
	}

	err = provider.parseServiceInfos(resp.Body)
	if err != nil {
		return err
	}

	return nil
}

func UnregistService(registration *Registration) error {
	data, err := json.Marshal(registration)
	if err != nil {
		return err
	}

	req, err := http.NewRequest(http.MethodDelete, "http://" + serviceAddr + "/services", bytes.NewReader(data))
	if err != nil {
		return err
	}

	resp, err := http.DefaultClient.Do(req)
	if err != nil {
		return err
	}

	if resp.StatusCode != http.StatusOK {
		return fmt.Errorf("Unregist %s error with code %d", registration.ServiceName, resp.StatusCode)
	}

	return nil
}

var provider = newServiceTable()

func Get(serviceName string) *Registration {
	return provider.get(serviceName)
}
  • 服务入口
package main

import (
	"log"
	"services/registry"
)

func main() {
	registryService := registry.Default()
	err := registryService.Run()
	if err != nil {
		log.Fatalln(err)
	}
}

基础服务模块

package service

import (
	"context"
	"fmt"
	"net/http"
	"services/registry"
)

type Service interface {
	Init()
}

func Run(registration *registry.Registration) (err error) {
	err = registry.RegistService(registration)
	if err != nil {
		return err
	}
	defer func() {
		err = registry.UnregistService(registration)
	}()

	srv := http.Server{Addr: registration.ServiceAddr}

	go func() {
		fmt.Println("Press any key to stop.")
		var s string
		fmt.Scan(&s)
		srv.Shutdown(context.Background())
	}()

	err = srv.ListenAndServe()
	if err != nil {
		return err
	}

	return nil
}

被依赖的服务模块(日志服务)

  • 业务服务
package logservice

import (
	"io"
	"log"
	"net/http"
	"os"
)

type logService struct {
	destination string
	logger *log.Logger
}

func Init(destination string) {
	s := &logService{
		destination: destination,
	}
	s.logger = log.New(s, "Go:", log.Ltime | log.Lshortfile)
	s.register()
}

func (s *logService)Write(data []byte) (int, error) {
	file, err := os.OpenFile(s.destination, os.O_CREATE | os.O_APPEND | os.O_WRONLY, 0600)
	if err != nil {
		return 0, err
	}
	defer file.Close()
	return file.Write(data)
}

func (s *logService)register() {
	http.HandleFunc("/log", func(w http.ResponseWriter, r *http.Request) {
		if r.Method != http.MethodPost {
			w.WriteHeader(http.StatusMethodNotAllowed)
			return
		}

		data, err := io.ReadAll(r.Body)
		if err != nil || len(data) == 0 {
			w.WriteHeader(http.StatusBadRequest)
			return
		}

		s.logger.Println(string(data))
	})
}
  • 客户端接口
package logservice

import (
	"bytes"
	"fmt"
	"net/http"
	"services/registry"
)

func Println(registration *registry.Registration, s string) error {
	resp, err := http.Post("http://"+registration.ServiceAddr+"/log", "text/plain", bytes.NewReader([]byte(s)))
	if err != nil {
		return err
	}
	if resp.StatusCode != http.StatusOK {
		return fmt.Errorf("Response Error with code: %d", resp.StatusCode)
	}
	return nil
}
  • 服务入口
package main

import (
	"log"
	"services/logservice"
	"services/registry"
	"services/service"
)

func main() {
	logservice.Init("./services.log")
	err := service.Run(&registry.Registration{
		ServiceName:      "LogService",
		ServiceAddr:      "127.0.0.1:20002",
		RequiredServices: make([]string, 0),
	})
	if err != nil {
		log.Fatalln(err)
	}
}

服务模块(访问服务)

  • 业务服务
package visistservice

import (
	"log"
	"net/http"
	"services/logservice"
	"services/registry"
	"strconv"
	"sync/atomic"
)

type visistService struct {
	visistCount atomic.Int32
}

func Init() {
	s := &visistService{
		visistCount: atomic.Int32{},
	}
	s.register()
}

func (s *visistService) register() {
	http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
		s.visistCount.Add(1)
		count := strconv.Itoa(int(s.visistCount.Load()))
		err := logservice.Println(registry.Get("LogService"), count)
		if err != nil {
			w.WriteHeader(http.StatusInternalServerError)
			log.Printf("Log service println error: %s\n", err)
			return
		}
		w.WriteHeader(http.StatusOK)
		w.Write([]byte(count))
	})
}
  • 服务入口
package main

import (
	"log"
	"services/registry"
	"services/service"
	"services/visistservice"
)

func main() {
	visistservice.Init()
	err := service.Run(&registry.Registration{
		ServiceName:      "VisistService",
		ServiceAddr:      "127.0.0.1:20003",
		RequiredServices: []string{"LogService"},
	})
	if err != nil {
		log.Fatalln(err)
	}
}

运行效果

依次运行注册服务,日志服务,浏览服务
在这里插入图片描述
运行完毕之后,访问http://127.0.0.1:20003,返回访问量
在这里插入图片描述

日志记录对应访问量数据
在这里插入图片描述
这里只是用了一个简单的示例,你可以使用这套基础组件,然后让服务变得更加复杂,更加丰富。

开发总结

恭喜你,我们一起完成了简易分布式系统的开发,麻雀虽小,五脏俱全😉
希望这个项目能让你有所收获😊
如果有什么错误,请你评论区或者私信我指出,让我们一起进步??

文章来源:https://blog.csdn.net/qq_67733273/article/details/135072242
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。