How do I collect data from sparklyr in a shiny reactive context?

organguy Source

I am unable to collect data (for plotting, say) from sparklyr (on a remote spark cluster) within the context of a shiny reactive. I can run all the code manually (say in rstudio or in an r console session) and if I run the same code snippet to collect data in shiny but outside a reactive context, it works fine.

Here is the ui.R file:

library(shiny)
library(shinythemes)

shinyUI(fluidPage(theme=shinytheme("darkly"),

titlePanel(HTML("Spark test")),

sidebarLayout(
  sidebarPanel(
      selectInput("symbol",h4("Select symbol"),choices=c("")),
      actionButton("doclick","Plot")
  ),

mainPanel(
    tags$head(tags$style("#testplot{height:85vh !important;}")),
   plotOutput("testplot")
)
  )
))

The server.R file is as follows:

library(shiny)
library(dplyr)

server<-function(input, output, session) {

  source("sparktest.R")
  sc<<-connectToSpark()
  tdf<<-loadData()
  symlist<<-as.character(tdf %>% distinct(Sym) %>% pull()) 

  updateSelectInput(session,"symbol",choices=sort(as.character(symlist)))
  plotsym<-reactiveVal(0)

  #plotdf<-tdf %>% filter(Sym=="AA") %>% 
  #  group_by(pdate) %>% summarise(dct=n()) %>% select(pdate,dct) %>%
  #  collect

  observeEvent(input$doclick,
  {
     plotsym(input$symbol)      
     output$testplot <- renderPlot({
      #plotTest2(plotdf)
      plotTest(tdf,plotsym())
    })
   })


 on.exit(spark_disconnect_all())
}

Lastly, the relevant section of the sparktest.R file is

plotTest<-function(df,symbol) {
  plotdf<-df %>% filter(Sym==symbol) %>% 
  group_by(pdate) %>% summarise(dct=n()) %>% select(pdate,dct) %>%
  collect

  p1<-plotdf %>% ggplot(aes(x=pdate))+
  geom_line(aes(y=dct))
  ylab("Day count")
  p1    
}

plotTest2<-function(df) {
  p1<-df %>% ggplot(aes(x=pdate))+
  geom_line(aes(y=dct))
  ylab("Day count")
  p1    
}

Connectivity to spark is fine and the ui provides a simple selector for a stock symbol. When selected, the spark data is filtered for that symbol and a simple plot of daily counts is generated. Everything is fine running manually, but the process fails when run through shiny. The failure occurs at the "collect" portion of the pipeline in plotTest. Here is the traceback:

Warning: Error in writeBin: invalid connection
Stack trace (innermost first):
126: writeBin
125: core_invoke_method
124: invoke_method.spark_shell_connection
123: invoke_method
122: invoke.shell_jobj
121: invoke
120: spark_version
119: create_hive_context.spark_shell_connection
118: create_hive_context
117: hive_context
116: invoke
115: .local
114: dbSendQuery
113: dbSendQuery
112: db_collect.DBIConnection
111: db_collect
110: collect.tbl_sql
109: collect
108: function_list[[k]]
107: withVisible
106: freduce
105: _fseq
104: eval
103: eval
102: withVisible
101: %>%
100: plotTest
 99: renderPlot [/home/rubedo/ShinyApps/sparktest/server.R#36]
 89: <reactive:plotObj>
 78: plotObj
 77: origRenderFunc
 76: output$testplot
  1: runApp

The same result occurs if I try to do anything along the lines of "collect", whether it be "pull" or "as_tibble"....anything that tries to reduce the spark table down to a simple non-lazy dataset.

In the server code, if I instead use the commented out section where I do the filtering outside of observeEvent and then do the plot on the filtered data within observeEvent, things are fine. But as soon as I try to do that filtering and subsequent "collect" within the reactive context, it fails.

Is there a fundamental incompatibility between trying to do this collection within a reactive context? Or is there a different method which one needs to use? I note I have tried doing direct sql via dbGetQuery and that also fails. Anything that tries to translate a lazy object into a non-lazy one within a reactive context is failing for me.

Any help is appreciated.

rapache-sparkshinysparklyr

Answers

comments powered by Disqus