分布式系统具有高可靠性、高性能、可扩展性、灵活性、数据共享、可靠性和地理分布等优点,使得其在各种应用场景下都具有巨大的优势,当然分布式系统实现复杂度要高于单体系统🫠
项目代码使用纯粹的Go语言标准库实现,不借用任何其它第三方库😁
我是醉墨居士,废话不多说,我们现在开始吧🤗
我们将要开发的分布式系统将会取其精华,去其糟粕,使用采用上述模型的混合模式😎
package registry
import (
"encoding/json"
"fmt"
"io"
"strings"
)
type Registration struct {
ServiceName string
ServiceAddr string
RequiredServices []string
}
func buildRegistration(reader io.ReadCloser) (*Registration, error) {
defer reader.Close()
data, err := io.ReadAll(reader)
if err != nil {
return nil, err
}
registration := new(Registration)
err = json.Unmarshal(data, registration)
if err != nil {
return nil, err
}
return registration, nil
}
func buildServiceInfo(reader io.ReadCloser) ([]string, error) {
defer reader.Close()
data, err := io.ReadAll(reader)
if err != nil {
return nil, err
}
parts := strings.SplitN(string(data), " ", 2)
if len(parts) != 2 {
return nil, fmt.Errorf("Parse service failed with length %d", len(parts))
}
return parts, nil
}
package registry
import (
"bytes"
"encoding/json"
"fmt"
"io"
"log"
"math/rand"
"net/http"
"sync"
)
type serviceTable struct {
serviceInfos map[string][]*Registration
lock *sync.RWMutex
}
func newServiceTable() *serviceTable {
return &serviceTable{
serviceInfos: make(map[string][]*Registration),
lock: new(sync.RWMutex),
}
}
func (t *serviceTable) parseServiceInfos(reader io.ReadCloser) (err error){
data, err := io.ReadAll(reader)
if err != nil {
return err
}
defer func() {
err = reader.Close()
}()
t.lock.Lock()
defer t.lock.Unlock()
err = json.Unmarshal(data, &t.serviceInfos)
return
}
func (t *serviceTable) buildRequiredServiceInfos(registration *Registration) map[string][]*Registration {
m := make(map[string][]*Registration, len(registration.RequiredServices))
t.lock.RLock()
defer t.lock.RUnlock()
for _, serviceName := range registration.RequiredServices {
m[serviceName] = t.serviceInfos[serviceName]
}
return m
}
func (t *serviceTable) notify(method string, registration *Registration) error {
if method != http.MethodPost && method != http.MethodDelete {
fmt.Println(method, method == http.MethodPost, method == http.MethodDelete)
return fmt.Errorf("Method not allowed with method: %s", method)
}
t.lock.RLock()
defer t.lock.RUnlock()
data, err := json.Marshal(registration)
if err != nil {
return err
}
for _, registrations := range t.serviceInfos {
for _, reg := range registrations {
for _, requiredServiceName := range reg.RequiredServices {
if requiredServiceName == registration.ServiceName {
req, err := http.NewRequest(method, "http://" + reg.ServiceAddr + "/services", bytes.NewReader(data))
if err != nil {
continue
}
log.Println("update url: ", reg.ServiceAddr + "/services")
http.DefaultClient.Do(req)
}
}
}
}
return nil
}
func (t *serviceTable) add(registration *Registration) {
t.lock.Lock()
defer t.lock.Unlock()
log.Printf("Service table add %s with address %s\n", registration.ServiceName, registration.ServiceAddr)
if registrations, ok := t.serviceInfos[registration.ServiceName]; ok {
registrations = append(registrations, registration)
} else {
t.serviceInfos[registration.ServiceName] = []*Registration{registration}
}
}
func (t *serviceTable) remove(registration *Registration) {
t.lock.Lock()
defer t.lock.Unlock()
log.Printf("Service table remove %s with address %s\n", registration.ServiceName, registration.ServiceAddr)
if registrations, ok := t.serviceInfos[registration.ServiceName]; ok {
for i := len(registrations) - 1; i >= 0; i-- {
if registrations[i].ServiceAddr == registration.ServiceAddr {
registrations = append(registrations[:i], registrations[i+1:]...)
}
}
}
}
func (t *serviceTable) get(serviceName string) *Registration {
t.lock.RLock()
defer t.lock.RUnlock()
regs := t.serviceInfos[serviceName]
return regs[rand.Intn(len(regs))]
}
package registry
import (
"encoding/json"
"net/http"
"time"
)
const (
serviceName = "Registry Service"
serviceAddr = "127.0.0.1:20000"
)
type RegistryService struct {
serviceInfos *serviceTable
heartBeatWorkerNumber int
heartBeatAttempCount int
heartBeatAttempDuration time.Duration
heartBeatCheckDuration time.Duration
}
func Default() *RegistryService {
return New(3, 3, time.Second, 30 * time.Second)
}
func New(heartBeatWorkerNumber, heartBeatAttempCount int, heartBeatAttempDuration, heartBeatCheckDuration time.Duration) *RegistryService {
return &RegistryService{
serviceInfos: newServiceTable(),
heartBeatWorkerNumber: heartBeatWorkerNumber,
heartBeatAttempCount: heartBeatAttempCount,
heartBeatAttempDuration: heartBeatAttempDuration,
heartBeatCheckDuration: heartBeatCheckDuration,
}
}
func (s *RegistryService) Run() error {
go s.heartBeat()
http.HandleFunc("/services", func(w http.ResponseWriter, r *http.Request) {
statusCode := http.StatusOK
switch r.Method {
case http.MethodPost:
registration, err := buildRegistration(r.Body)
if err != nil {
statusCode = http.StatusInternalServerError
goto END
}
err = s.regist(registration)
if err != nil {
statusCode = http.StatusInternalServerError
goto END
}
serviceInfos := s.serviceInfos.buildRequiredServiceInfos(registration)
data, err := json.Marshal(&serviceInfos)
if err != nil {
statusCode = http.StatusInternalServerError
goto END
}
defer w.Write(data)
case http.MethodDelete:
registration, err := buildRegistration(r.Body)
if err != nil {
statusCode = http.StatusInternalServerError
goto END
}
s.unregist(registration)
if err != nil {
statusCode = http.StatusInternalServerError
goto END
}
default:
statusCode = http.StatusMethodNotAllowed
goto END
}
END:
w.WriteHeader(statusCode)
})
return http.ListenAndServe(serviceAddr, nil)
}
func (s *RegistryService) heartBeat() {
channel := make(chan *Registration, 1)
for i := 0; i < s.heartBeatWorkerNumber; i++ {
go func() {
for reg := range channel {
for j := 0; j < s.heartBeatAttempCount; j++ {
resp, err := http.Get("http://" + reg.ServiceAddr + "/heart-beat")
if err == nil && resp.StatusCode == http.StatusOK {
goto NEXT
}
time.Sleep(s.heartBeatAttempDuration)
}
s.unregist(reg)
NEXT:
}
}()
}
for {
s.serviceInfos.lock.RLock()
for _, registrations := range s.serviceInfos.serviceInfos {
for i := len(registrations) - 1; i >= 0; i-- {
channel <- registrations[i]
}
}
s.serviceInfos.lock.RUnlock()
time.Sleep(s.heartBeatCheckDuration)
}
}
func (s *RegistryService) regist(registration *Registration) error {
s.serviceInfos.add(registration)
return s.serviceInfos.notify(http.MethodPost, registration)
}
func (s *RegistryService) unregist(registration *Registration) error {
s.serviceInfos.remove(registration)
return s.serviceInfos.notify(http.MethodDelete, registration)
}
package registry
import (
"bytes"
"encoding/json"
"fmt"
"net/http"
)
func registerMonitorHandler() {
http.HandleFunc("/services", func(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
switch r.Method {
case http.MethodPost:
registration, err := buildRegistration(r.Body)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
provider.add(registration)
fmt.Printf("add service %s\n", registration.ServiceName)
case http.MethodDelete:
registration, err := buildRegistration(r.Body)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
provider.remove(registration)
fmt.Printf("remove service %s\n", registration.ServiceName)
default:
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
w.WriteHeader(http.StatusOK)
})
http.HandleFunc("/heart-beat", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
})
}
func RegistService(registration *Registration) error {
registerMonitorHandler()
data, err := json.Marshal(registration)
if err != nil {
return err
}
resp, err := http.Post("http://" + serviceAddr + "/services", "application/json", bytes.NewReader(data))
if err != nil {
return err
}
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("Regist %s error with code %d", registration.ServiceName, resp.StatusCode)
}
err = provider.parseServiceInfos(resp.Body)
if err != nil {
return err
}
return nil
}
func UnregistService(registration *Registration) error {
data, err := json.Marshal(registration)
if err != nil {
return err
}
req, err := http.NewRequest(http.MethodDelete, "http://" + serviceAddr + "/services", bytes.NewReader(data))
if err != nil {
return err
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("Unregist %s error with code %d", registration.ServiceName, resp.StatusCode)
}
return nil
}
var provider = newServiceTable()
func Get(serviceName string) *Registration {
return provider.get(serviceName)
}
package main
import (
"log"
"services/registry"
)
func main() {
registryService := registry.Default()
err := registryService.Run()
if err != nil {
log.Fatalln(err)
}
}
package service
import (
"context"
"fmt"
"net/http"
"services/registry"
)
type Service interface {
Init()
}
func Run(registration *registry.Registration) (err error) {
err = registry.RegistService(registration)
if err != nil {
return err
}
defer func() {
err = registry.UnregistService(registration)
}()
srv := http.Server{Addr: registration.ServiceAddr}
go func() {
fmt.Println("Press any key to stop.")
var s string
fmt.Scan(&s)
srv.Shutdown(context.Background())
}()
err = srv.ListenAndServe()
if err != nil {
return err
}
return nil
}
package logservice
import (
"io"
"log"
"net/http"
"os"
)
type logService struct {
destination string
logger *log.Logger
}
func Init(destination string) {
s := &logService{
destination: destination,
}
s.logger = log.New(s, "Go:", log.Ltime | log.Lshortfile)
s.register()
}
func (s *logService)Write(data []byte) (int, error) {
file, err := os.OpenFile(s.destination, os.O_CREATE | os.O_APPEND | os.O_WRONLY, 0600)
if err != nil {
return 0, err
}
defer file.Close()
return file.Write(data)
}
func (s *logService)register() {
http.HandleFunc("/log", func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
data, err := io.ReadAll(r.Body)
if err != nil || len(data) == 0 {
w.WriteHeader(http.StatusBadRequest)
return
}
s.logger.Println(string(data))
})
}
package logservice
import (
"bytes"
"fmt"
"net/http"
"services/registry"
)
func Println(registration *registry.Registration, s string) error {
resp, err := http.Post("http://"+registration.ServiceAddr+"/log", "text/plain", bytes.NewReader([]byte(s)))
if err != nil {
return err
}
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("Response Error with code: %d", resp.StatusCode)
}
return nil
}
package main
import (
"log"
"services/logservice"
"services/registry"
"services/service"
)
func main() {
logservice.Init("./services.log")
err := service.Run(®istry.Registration{
ServiceName: "LogService",
ServiceAddr: "127.0.0.1:20002",
RequiredServices: make([]string, 0),
})
if err != nil {
log.Fatalln(err)
}
}
package visistservice
import (
"log"
"net/http"
"services/logservice"
"services/registry"
"strconv"
"sync/atomic"
)
type visistService struct {
visistCount atomic.Int32
}
func Init() {
s := &visistService{
visistCount: atomic.Int32{},
}
s.register()
}
func (s *visistService) register() {
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
s.visistCount.Add(1)
count := strconv.Itoa(int(s.visistCount.Load()))
err := logservice.Println(registry.Get("LogService"), count)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
log.Printf("Log service println error: %s\n", err)
return
}
w.WriteHeader(http.StatusOK)
w.Write([]byte(count))
})
}
package main
import (
"log"
"services/registry"
"services/service"
"services/visistservice"
)
func main() {
visistservice.Init()
err := service.Run(®istry.Registration{
ServiceName: "VisistService",
ServiceAddr: "127.0.0.1:20003",
RequiredServices: []string{"LogService"},
})
if err != nil {
log.Fatalln(err)
}
}
依次运行注册服务,日志服务,浏览服务
运行完毕之后,访问http://127.0.0.1:20003,返回访问量
日志记录对应访问量数据
这里只是用了一个简单的示例,你可以使用这套基础组件,然后让服务变得更加复杂,更加丰富。
恭喜你,我们一起完成了简易分布式系统的开发,麻雀虽小,五脏俱全😉
希望这个项目能让你有所收获😊
如果有什么错误,请你评论区或者私信我指出,让我们一起进步??