Skip to content
16 changes: 15 additions & 1 deletion servers/src/common/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use std::sync::Arc;
use chrono::prelude::Utc;
use rand::prelude::*;

use crate::api;
use crate::chain;
use crate::core::global::{ChainTypes, DEFAULT_FUTURE_TIME_LIMIT};
use crate::core::{core, libtx, pow};
Expand All @@ -28,6 +27,7 @@ use crate::p2p;
use crate::pool;
use crate::pool::types::DandelionConfig;
use crate::store;
use crate::{api, Server};

/// Error type wrapping underlying module errors.
#[derive(Debug)]
Expand Down Expand Up @@ -405,3 +405,17 @@ impl DandelionEpoch {
self.relay_peer.clone()
}
}

/// Server initialization status.
pub enum ServerInitStatus {
/// Database loading.
LoadDatabase,
/// P2P server initialization.
StartSync,
/// API server initialization.
StartAPI,
/// Server instance after successful initialization.
FinishedLoading(Server),
/// Error on initialization.
ErrorLoading(Error),
}
31 changes: 19 additions & 12 deletions servers/src/grin/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use crate::common::hooks::{init_chain_hooks, init_net_hooks};
use crate::common::stats::{
ChainStats, DiffBlock, DiffStats, PeerStats, ServerStateInfo, ServerStats, TxStats,
};
use crate::common::types::{Error, ServerConfig, StratumServerConfig};
use crate::common::types::{Error, ServerConfig, ServerInitStatus, StratumServerConfig};
use crate::core::core::hash::{Hashed, ZERO_HASH};
use crate::core::ser::ProtocolVersion;
use crate::core::{consensus, genesis, global, pow};
Expand All @@ -52,7 +52,6 @@ use crate::pool;
use crate::util::file::get_first_line;
use crate::util::{RwLock, StopState};
use futures::channel::oneshot;
use grin_util::logger::LogEntry;

/// Arcified thread-safe TransactionPool with type parameters used by server components
pub type ServerTxPool = Arc<RwLock<pool::TransactionPool<PoolToChainAdapter, PoolToNetAdapter>>>;
Expand Down Expand Up @@ -84,20 +83,16 @@ impl Server {
/// Instantiates and starts a new server. Optionally takes a callback
/// for the server to send an ARC copy of itself, to allow another process
/// to poll info about the server status
pub fn start<F>(
pub fn start(
config: ServerConfig,
logs_rx: Option<mpsc::Receiver<LogEntry>>,
mut info_callback: F,
stop_state: Option<Arc<StopState>>,
server_tx: Option<mpsc::Sender<ServerInitStatus>>,
api_chan: &'static mut (oneshot::Sender<()>, oneshot::Receiver<()>),
) -> Result<(), Error>
where
F: FnMut(Server, Option<mpsc::Receiver<LogEntry>>),
{
) -> Result<Server, Error> {
let mining_config = config.stratum_mining_config.clone();
let enable_test_miner = config.run_test_miner;
let test_miner_wallet_url = config.test_miner_wallet_url.clone();
let serv = Server::new(config, stop_state, api_chan)?;
let serv = Server::new(config, stop_state, server_tx, api_chan)?;

if let Some(c) = mining_config {
let enable_stratum_server = c.enable_stratum_server;
Expand All @@ -118,8 +113,7 @@ impl Server {
}
}

info_callback(serv, logs_rx);
Ok(())
Ok(serv)
}

// Exclusive (advisory) lock_file to ensure we do not run multiple
Expand Down Expand Up @@ -151,6 +145,7 @@ impl Server {
pub fn new(
config: ServerConfig,
stop_state: Option<Arc<StopState>>,
server_tx: Option<mpsc::Sender<ServerInitStatus>>,
api_chan: &'static mut (oneshot::Sender<()>, oneshot::Receiver<()>),
) -> Result<Server, Error> {
// Obtain our lock_file or fail immediately with an error.
Expand Down Expand Up @@ -193,6 +188,10 @@ impl Server {

info!("Starting server, genesis block: {}", genesis.hash());

if let Some(ref server_tx) = server_tx {
let _ = server_tx.send(ServerInitStatus::LoadDatabase);
}

let shared_chain = Arc::new(chain::Chain::init(
config.db_root.clone(),
chain_adapter.clone(),
Expand Down Expand Up @@ -220,6 +219,10 @@ impl Server {
};
debug!("Capabilities: {:?}", capabilities);

if let Some(ref server_tx) = server_tx {
let _ = server_tx.send(ServerInitStatus::StartSync);
}

let p2p_server = Arc::new(p2p::Server::new(
&config.db_root,
capabilities,
Expand Down Expand Up @@ -265,6 +268,10 @@ impl Server {
}
})?;

if let Some(ref server_tx) = server_tx {
let _ = server_tx.send(ServerInitStatus::StartAPI);
}

info!("Starting rest apis at: {}", &config.api_http_addr);
let api_secret = get_first_line(config.api_secret_path.clone());
let foreign_api_secret = get_first_line(config.foreign_api_secret_path.clone());
Expand Down
48 changes: 23 additions & 25 deletions src/bin/cmd/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ use crate::tui::ui;
use futures::channel::oneshot;
use grin_p2p::msg::PeerAddrs;
use grin_p2p::PeerAddr;
use grin_servers::common::types::ServerInitStatus;
use grin_servers::Server;
use grin_util::logger::LogEntry;
use std::sync::mpsc;

Expand All @@ -46,29 +48,25 @@ fn start_server_tui(
logs_rx: Option<mpsc::Receiver<LogEntry>>,
api_chan: &'static mut (oneshot::Sender<()>, oneshot::Receiver<()>),
) {
// Run the UI controller.. here for now for simplicity to access
// everything it might need
if config.run_tui.unwrap_or(false) {
warn!("Starting GRIN in UI mode...");
servers::Server::start(
config,
logs_rx,
|serv: servers::Server, logs_rx: Option<mpsc::Receiver<LogEntry>>| {
let mut controller = ui::Controller::new(logs_rx.unwrap()).unwrap_or_else(|e| {
panic!("Error loading UI controller: {}", e);
});
controller.run(serv);
},
None,
api_chan,
)
.unwrap();
// Run the UI controller.
let (serv_tx, serv_rx) = mpsc::channel::<ServerInitStatus>();
let mut controller = ui::Controller::new(logs_rx, serv_rx).unwrap_or_else(|e| {
panic!("Error loading UI controller: {}", e);
});
let serv_tx_clone = serv_tx.clone();
thread::spawn(move || {
Comment thread
ardocrat marked this conversation as resolved.
match Server::start(config, None, Some(serv_tx_clone.clone()), api_chan) {
Ok(s) => serv_tx_clone.send(ServerInitStatus::FinishedLoading(s)),
Err(e) => serv_tx_clone.send(ServerInitStatus::ErrorLoading(e)),
}
});
controller.run();
} else {
warn!("Starting GRIN w/o UI...");
servers::Server::start(
config,
logs_rx,
|serv: servers::Server, _: Option<mpsc::Receiver<LogEntry>>| {
match Server::start(config, None, None, api_chan) {
Ok(s) => {
let running = Arc::new(AtomicBool::new(true));
let r = running.clone();
ctrlc::set_handler(move || {
Expand All @@ -79,12 +77,12 @@ fn start_server_tui(
thread::sleep(Duration::from_secs(1));
}
warn!("Received SIGINT (Ctrl+C) or SIGTERM (kill).");
serv.stop();
},
None,
api_chan,
)
.unwrap();
s.stop();
}
Err(e) => {
Comment thread
ardocrat marked this conversation as resolved.
error!("Error starting GRIN: {:?}", e);
}
}
}
}

Expand Down
81 changes: 70 additions & 11 deletions src/bin/tui/ui.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,15 @@ use crate::tui::constants::{ROOT_STACK, VIEW_BASIC_STATUS, VIEW_MINING, VIEW_PEE
use crate::tui::types::{TUIStatusListener, UIMessage};
use crate::tui::{logs, menu, mining, peers, status, version};
use grin_core::global;
use grin_servers::common::types::{Error, ServerInitStatus};
use grin_util::logger::LogEntry;

pub struct UI {
cursive: CursiveRunner<CursiveRunnable>,
ui_rx: mpsc::Receiver<UIMessage>,
ui_tx: mpsc::Sender<UIMessage>,
controller_tx: mpsc::Sender<ControllerMessage>,
logs_rx: mpsc::Receiver<LogEntry>,
logs_rx: Option<mpsc::Receiver<LogEntry>>,
}

fn modify_theme(theme: &mut Theme) {
Expand All @@ -65,7 +66,7 @@ impl UI {
/// Create a new UI
pub fn new(
controller_tx: mpsc::Sender<ControllerMessage>,
logs_rx: mpsc::Receiver<LogEntry>,
logs_rx: Option<mpsc::Receiver<LogEntry>>,
) -> UI {
let (ui_tx, ui_rx) = mpsc::channel::<UIMessage>();

Expand Down Expand Up @@ -122,6 +123,11 @@ impl UI {
// Configure a callback (shutdown, for the first test)
let controller_tx_clone = grin_ui.controller_tx.clone();
grin_ui.cursive.add_global_callback('q', move |c| {
let v = c.pop_layer();
Comment thread
ardocrat marked this conversation as resolved.
Outdated
if v.is_some() {
c.add_layer(v.unwrap());
return;
}
let content = StyledString::styled("Shutting down...", Color::Light(BaseColor::Yellow));
c.add_layer(CircularFocus::new(Dialog::around(TextView::new(content))).wrap_tab());
controller_tx_clone
Expand All @@ -139,8 +145,10 @@ impl UI {
return false;
}

while let Some(message) = self.logs_rx.try_iter().next() {
logs::TUILogsView::update(&mut self.cursive, message);
if let Some(logs_rx) = &self.logs_rx {
while let Some(message) = logs_rx.try_iter().next() {
logs::TUILogsView::update(&mut self.cursive, message);
}
}

// Process any pending UI messages
Expand Down Expand Up @@ -174,6 +182,8 @@ impl UI {
pub struct Controller {
rx: mpsc::Receiver<ControllerMessage>,
ui: UI,
serv_rx: mpsc::Receiver<ServerInitStatus>,
server: Option<Server>,
}

pub enum ControllerMessage {
Expand All @@ -182,16 +192,52 @@ pub enum ControllerMessage {

impl Controller {
/// Create a new controller
pub fn new(logs_rx: mpsc::Receiver<LogEntry>) -> Result<Controller, String> {
pub fn new(
logs_rx: Option<mpsc::Receiver<LogEntry>>,
serv_rx: mpsc::Receiver<ServerInitStatus>,
) -> Result<Controller, String> {
let (tx, rx) = mpsc::channel::<ControllerMessage>();
Ok(Controller {
rx,
ui: UI::new(tx, logs_rx),
serv_rx,
server: None,
})
}

/// Server initialization status.
pub fn init_status(&mut self, text: &str, pop: bool) {
if pop {
self.ui.cursive.pop_layer();
}
let content = StyledString::styled(text, Color::Light(BaseColor::Green));
self.ui
.cursive
.add_layer(CircularFocus::new(Dialog::around(TextView::new(content))).wrap_tab());
}

/// Server initialization error.
pub fn init_error(&mut self, e: Error) {
let content = StyledString::styled(format!("{:?}", e), Color::Light(BaseColor::Red));
self.ui.cursive.add_layer(
CircularFocus::new(Dialog::around(TextView::new(content)).button("Exit", |s| {
s.quit();
}))
.wrap_tab(),
);
}

/// Server UI after initialization.
pub fn server(&mut self, server: &Server) {
if let Ok(stats) = server.get_server_stats() {
self.ui.ui_tx.send(UIMessage::UpdateStatus(stats)).unwrap();
}
}

/// Run the controller
pub fn run(&mut self, server: Server) {
pub fn run(&mut self) {
self.init_status("Starting server...", false);

let stat_update_interval = 1;
let mut next_stat_update = Utc::now().timestamp() + stat_update_interval;
let delay = time::Duration::from_millis(50);
Expand All @@ -201,20 +247,33 @@ impl Controller {
ControllerMessage::Shutdown => {
warn!("Shutdown in progress, please wait");
self.ui.stop();
server.stop();
return;
break;
Comment thread
ardocrat marked this conversation as resolved.
}
}
}

if let Some(m) = self.serv_rx.try_iter().next() {
match m {
ServerInitStatus::LoadDatabase => self.init_status("Loading database...", true),
ServerInitStatus::StartSync => self.init_status("Start syncing...", true),
ServerInitStatus::StartAPI => self.init_status("Starting API...", true),
ServerInitStatus::FinishedLoading(s) => {
self.ui.cursive.pop_layer();
self.server = Some(s)
}
ServerInitStatus::ErrorLoading(e) => self.init_error(e),
}
}

if Utc::now().timestamp() > next_stat_update {
next_stat_update = Utc::now().timestamp() + stat_update_interval;
if let Ok(stats) = server.get_server_stats() {
self.ui.ui_tx.send(UIMessage::UpdateStatus(stats)).unwrap();
if let Some(server) = &self.server {
if let Ok(stats) = server.get_server_stats() {
self.ui.ui_tx.send(UIMessage::UpdateStatus(stats)).unwrap();
}
}
}
thread::sleep(delay);
}
server.stop();
}
}
Loading