搭建DNS服务(七)

powerdns内部机制分析

Posted by Jht on July 23, 2018

PowerDNS内部机制分析

为什么研究?

根据他的主从策略。他说Native replication是默认就支持的,不需要额外的配置。

参考资料:DNS Modes of Operation

那么他是咋支持的?

通过看源码,看到了内部架构。

架构

avatar

分析

代码片段中有相关的注释

启动流程

guardian模式

如果启动参数中“–guardian=yes”,PowerDNS将启动guardian模式,即fork出一个子进程执行正常的启动流程,而父进程则一秒钟探测一次子进程是否活着,如果子进程非正常退出则重启动子进程。

代码片段如下:


// receiver.cc

void daemonize(void)
{
    ......
    if(::arg().mustDo("guardian") && !isGuarded(argv)) {
      if(::arg().mustDo("daemon")) {
        g_log.toConsole(Logger::Critical);
        daemonize();
      }
      guardian(argc, argv);  
      # never get here, guardian will reinvoke process

      cerr<<"Um, we did get here!"<<endl;
    } 
    .......
}

static int guardian(int argc, char **argv)
{
    ........
    for(;;) {
        ........
        for(;;) {
            int ret=waitpid(pid,&status,WNOHANG);

            if(ret<0) {
            g_log<<Logger::Error<<"In guardian loop, waitpid returned error: "<<strerror(errno)<<endl;
            g_log<<Logger::Error<<"Dying"<<endl;
            exit(1);
            }
            else if(ret) // something exited

            break;
            else { // child is alive
           
            // execute some kind of ping here 

            if(DLQuitPlease())
                takedown(1); // needs a parameter
    
            setStatus("Child running on pid "+itoa(pid));
            sleep(1);
            }
        }

        pthread_mutex_lock(&g_guardian_lock);
        close(g_fd1[1]);
        fclose(g_fp);
        g_fp=0;

        if(WIFEXITED(status)) {
            int ret=WEXITSTATUS(status);

            if(ret==99) {
            g_log<<Logger::Error<<"Child requested a stop, exiting"<<endl;
            exit(1);
            }
            setStatus("Child died with code "+itoa(ret));
            g_log<<Logger::Error<<"Our pdns instance exited with code "<<ret<<", respawning"<<endl;

            sleep(1);
            continue;
        }
        if(WIFSIGNALED(status)) {
            int sig=WTERMSIG(status);
            setStatus("Child died because of signal "+itoa(sig));
            g_log<<Logger::Error<<"Our pdns instance ("<<pid<<") exited after signal "<<sig<<endl;
        #ifdef WCOREDUMP
            if(WCOREDUMP(status)) 
            g_log<<Logger::Error<<"Dumped core"<<endl;
        #endif

            g_log<<Logger::Error<<"Respawning"<<endl;
            sleep(1);
            continue;
        }
        g_log<<Logger::Error<<"No clue what happened! Respawning"<<endl;
        }
        else {
        g_log<<Logger::Error<<"Unable to fork: "<<strerror(errno)<<endl;
        exit(1);
        }
    }
    ........
}
加载modules

根据load-modules给定的值,进行加载modules

根据launch给定的值,进行加载modules

代码片段如下:

//receiver.cc

void main(void)
{
    ......
    if(!::arg()["load-modules"].empty()) {
      vector<string> modules;

      stringtok(modules,::arg()["load-modules"], ", ");
      if (!UeberBackend::loadModules(modules, ::arg()["module-dir"])) {
        exit(1);
      }
    }
    BackendMakers().launch(::arg()["launch"]);

    ......
}
实例化DynListener

代码片段如下:

//receiver.cc

void daemonize(void)
{
    ......
    if(isGuarded(argv)) {
      g_log<<Logger::Warning<<"This is a guarded instance of pdns"<<endl;
      dl=new DynListener; // listens on stdin 
    }
    else {
      g_log<<Logger::Warning<<"This is a standalone pdns"<<endl; 
      
      if(::arg().mustDo("control-console"))
        dl=new DynListener();
      else
        dl=new DynListener(s_programname);
      
      writePid();
    }
    ......
}
实例化UDPNameServer
//receiver.cc

void daemonize(void)
{
    .....
    N=std::make_shared<UDPNameserver>(); // this fails when we are not root, throws exception

    g_udpReceivers.push_back(N);

    size_t rthreads = ::arg().asNum("receiver-threads", 1);
    if (rthreads > 1 && N->canReusePort()) {
      g_udpReceivers.resize(rthreads);

      for (size_t idx = 1; idx < rthreads; idx++) {
        try {
          g_udpReceivers[idx] = std::make_shared<UDPNameserver>(true);
        }
        catch(const PDNSException& e) {
          g_log<<Logger::Error<<"Unable to reuse port, falling back to original bind"<<endl;
          break;
        }
      }
    }
    ......
}
实例化TCPNameServer
//receiver.cc

void daemonize(void)
{
    .....
    if(!::arg().mustDo("disable-tcp"))
      TN=new TCPNameserver; 
    ......
}
启动DNSProxy
//common_startup.cc

void mainthread()
{
    .....
    if(::arg().mustDo("resolver")){
        DP=new DNSProxy(::arg()["resolver"]);
        DP->go();
    }
    .....
}
启动WebServer
//common_startup.cc

void mainthread()
{
    .....
    if(::arg().mustDo("webserver") || ::arg().mustDo("api"))
        webserver.go();
    .....
}
启动DynListener
//common_startup.cc

void mainthread()
{
    .....
    dl->go();
    .....
}
启动TCPNameServer
//common_startup.cc

void mainthread()
{
    .....
    if(TN)
        TN->go(); // tcp nameserver launch
    .....
}

TCPNameServer会在接收数据包时创建工作线程。

//tcpreceiver.cc

void TCPNameserver::thread()
{
    .......
    if(pthread_create(&tid, 0, &doConnection, reinterpret_cast<void*>(fd))) {
        g_log<<Logger::Error<<"Error creating thread: "<<stringerror()<<endl;
        d_connectionroom_sem->post();
        close(fd);
        decrementClientCount(remote);
    }
    .......
}

UDPNameServer分配工作线程
//common_startup.cc

void mainthread()
{
    .....
    unsigned int max_rthreads= ::arg().asNum("receiver-threads", 1);
    g_distributors.resize(max_rthreads);
    for(unsigned int n=0; n < max_rthreads; ++n)
        pthread_create(&qtid,0,qthread, reinterpret_cast<void *>(n));
    .....
}

而每个接收线程则会创建若干自己的distributor线程。

//common_startup.cc

void *qthread(void *number)
{
    .....
    DNSPacket *P;
    DNSDistributor *distributor = DNSDistributor::Create(::arg().asNum("distributor-threads", 1)); // the big dispatcher!
    .....
}

查询流程

先在PacketCache中查询。如果PacketCache中没有找到就去Backend。

PacketCache是Backend响应的Cache。

TCP查询

//tcpreceiver.cc

void *TCPNameserver::doConnection(void *data)
{
    .....
    // 第一步:在PacketCache查询

   // short circuit - does the PacketCache recognize this question?

    if(packet->couldBeCached() && PC.get(packet.get(), cached.get())) {

        if(logDNSQueries)
          g_log<<"packetcache HIT"<<endl;
        cached->setRemote(&packet->d_remote);

        cached->d.id=packet->d.id;

        cached->d.rd=packet->d.rd; // copy in recursion desired bit 

        cached->commitD(); // commit d to the packet  inlined

        sendPacket(cached, fd); // presigned, don't do it again

        continue;
      }
    .....

    // 第二步:backend中查询

      if(logDNSQueries)
          g_log<<"packetcache MISS"<<endl;  
      {
        Lock l(&s_plock);
        if(!s_P) {
          g_log<<Logger::Error<<"TCP server is without backend connections, launching"<<endl;

          s_P=new PacketHandler;

        }

        // we really need to ask the backend :-)
        
        // doQuestion回去数据里查询,并将查询结果放入PacketCache

        reply=shared_ptr<DNSPacket>(s_P->doQuestion(packet.get())); 
      }

      if(!reply)  // unable to write an answer?

        break;

      sendPacket(reply, fd);

}

//packethandler.cc

// 放入PacketCache

DNSPacket *PacketHandler::doQuestion(DNSPacket *p)
{
    .........

    r->wrapup(); // needed for inserting in cache

    if(!noCache && p->couldBeCached())
      PC.insert(p, r, r->getMinTTL()); // in the packet cache

    .........
}

UDP查询

// common_startup.cc

void *qthread(void *number)0
try
{
    ......
    // 第一步:查询PacketCache

        if((P->d.opcode != Opcode::Notify && P->d.opcode != Opcode::Update) && P->couldBeCached()) {
           
        // does the PacketCache recognize this question?
           
        bool haveSomething=PC.get(P, &cached);

        if (haveSomething) {
            if(logDNSQueries)
                g_log<<"packetcache HIT"<<endl;

            cached.setRemote(&P->d_remote);  // inlined

            cached.setSocket(P->getSocket()); // inlined

            cached.d_anyLocal = P->d_anyLocal;

            cached.setMaxReplyLen(P->getMaxReplyLen());

            cached.d.rd=P->d.rd; // copy in recursion desired bit

            cached.d.id=P->d.id;

            cached.commitD(); // commit d to the packet inlined

            NS->send(&cached); // answer it then inlined

            diff=P->d_dt.udiff();

            avg_latency=(int)(0.999*avg_latency+0.001*diff); // 'EWMA'

            continue;
        }
    }

    .......

    // 第二步:去backend中查询

    try {
      
      // 这个方法是DNSPacket *PacketHandler::question(DNSPacket *p)
      
      // question调用的是doQuestion,和Tcp查询中最终调用的方法一样

      // sendout函数是返给客户端的响应。
      
      distributor->question(P, &sendout); 

    }
    catch(DistributorFatal& df) { // when this happens, we have leaked loads of memory. Bailing out time.

      _exit(1);

    }
    ......
}

UeberBackend

UeberBackend是一个特殊的Backend。启动时被加载的backends向它注册,添加到一个vector中。UeberBackend按注册顺序依次调用其他backend的相应成员方法。

调用

// ueberbackend.cc

// 第一步:UeberBackend的get回调d_handle.get(rr)
bool UeberBackend::get(DNSZoneRecord &rr)
{
  ......
  if(!d_handle.get(rr)) {
    ......
  }
    .......
}

bool UeberBackend::handle::get(DNSZoneRecord &r)
{
  .......
  // 第二步:d_handle.get首先调用第一个backend的get方法

  while(d_hinterBackend && !(isMore=d_hinterBackend->get(r))) { // this backend out of answers

  // 第三步:不成功 调用下一个backend的lookup和get方法,直到返回成功或者遍历完所有backend

    if(i<parent->backends.size()) {

      DLOG(g_log<<"Backend #"<<i<<" of "<<parent->backends.size()
           <<" out of answers, taking next"<<endl);
      
      d_hinterBackend=parent->backends[i++];
      d_hinterBackend->lookup(qtype,qname,pkt_p,parent->d_domain_id);
    }
    else 
      break; // 成功就返回

    DLOG(g_log<<"Now asking backend #"<<i<<endl);
  }
  if(!isMore && i==parent->backends.size()) {
    DLOG(g_log<<"UeberBackend reached end of backends"<<endl);
    return false;
  }

  DLOG(g_log<<"Found an answering backend - will not try another one"<<endl);
  i=parent->backends.size(); // don't go on to the next backend
  return true;
}
 

Communicator

  • slave:是不是supermastert的通知,包括创建对应的Slave的domain等;do AXFR from master,
  • master:master检查master的domain是否有更新,有更新的化放入队列
  • doNotifications:master 检查队列是否有通知并发送通知
//communicator.cc

void CommunicatorClass::mainloop(void)
{
  try {
    .......

    // 每slave-cycle-interval检查一次

    d_tickinterval=::arg().asNum("slave-cycle-interval");

    .......
    for(;;) {

      slaveRefresh(&P); //包括创建对应的Slave的domain,do AXFR from master

      masterUpdateCheck(&P); //master检查master的domain是否有更新,包括通过api修改的更新

      tick=doNotifications(); //this processes any notification acknowledgements and actually send out our own notifications
      
      tick = min (tick, d_tickinterval);
      next=time(0)+tick;
      while(time(0) < next) {
        rc=d_any_sem.tryWait();

        if(rc) {
          bool extraSlaveRefresh = false;
          Utility::sleep(1);
          {
            Lock l(&d_lock);
            if (d_tocheck.size())
              extraSlaveRefresh = true;
          }
          if (extraSlaveRefresh)
            slaveRefresh(&P);
        }
        else { 
          break; // something happened

        }
        // this gets executed at least once every second

        doNotifications();
      }
    }
  ........
}

masterUpdateCheck

//mastercommunicator.cc

void CommunicatorClass::masterUpdateCheck(PacketHandler *P)
{
  .......
  // 查询所有的domain

  B->getUpdatedMasters(&cmdomains); 
  ........
  
  for(auto& di : cmdomains) {

    purgeAuthCachesExact(di.zone);

    queueNotifyDomain(di, B); //查询是否有通知,并放入队列

    di.backend->setNotified(di.id, di.serial); //将记录设置为已通知,set serial

  }
}

处理接收到的通知

//packethandler.cc

int PacketHandler::processNotify(DNSPacket *p)
{
    ......
}